96 lines
3.3 KiB
Python
96 lines
3.3 KiB
Python
# python -m serial.tools.list_ports
|
|
|
|
import sys
|
|
import time
|
|
#import binascii
|
|
import base64
|
|
import serial
|
|
import threading
|
|
import json
|
|
|
|
ctrlC = bytes.fromhex("03")
|
|
ctrlD = bytes.fromhex("04")
|
|
|
|
class ComWithDongle:
|
|
"""Class to manage communication with dongle, over virtual COM port"""
|
|
def __init__(self, comPort:str, peripheralName:str, onMsgReceived, debug=False):
|
|
""":param comPort: name of COM port used by dongle
|
|
:param peripheralName: name of BLE peripheral
|
|
:param onMsgReceived: function to call when a message from peripheral is received
|
|
:param debug: when True, print debug messages received from dongle"""
|
|
try:
|
|
self.ser = serial.Serial(port=comPort, baudrate=115200, timeout=2)
|
|
except serial.SerialException:
|
|
exit(f"no device on port {comPort}")
|
|
self.bleConnected = threading.Semaphore(0)
|
|
self.messageSent = threading.Semaphore(0)
|
|
self.onMsgReceived = onMsgReceived
|
|
self.debug = debug
|
|
self.resetDongle()
|
|
threading.Thread(name='readComPort', target=self.readFromComPort, daemon=True).start()
|
|
# first message over COM port to dongle is to define BLE peripheral to connect on
|
|
self.sendDict({'type':'connect','name':peripheralName})
|
|
timeoutNotReached = self.bleConnected.acquire(timeout=5)
|
|
if not timeoutNotReached:
|
|
exit(f'unable to connect to peripheral "{peripheralName}"')
|
|
|
|
def resetDongle(self):
|
|
self.ser.write(ctrlC)
|
|
self.ser.write(ctrlD)
|
|
self.ser.flush()
|
|
time.sleep(2)
|
|
|
|
def sendDict(self, msg:dict):
|
|
self.ser.write(json.dumps(msg).encode("utf-8") + b'\r')
|
|
|
|
def sendMsg(self, msg:str|bytes):
|
|
if isinstance(msg, str):
|
|
self.sendDict({'type':'msg', 'format':'str', 'string':msg})
|
|
else:
|
|
#self.sendDict({'type':'msg', 'base64':binascii.b2a_base64(msg).decode("utf-8")})
|
|
#b = binascii.b2a_base64(msg).decode("utf-8").rstrip()
|
|
#b = base64.b64encode(msg).decode("utf-8").rstrip()
|
|
b = base64.b64encode(msg).decode("utf-8").rstrip()
|
|
if self.debug: print('sendMsg', msg, '=>', b, flush=True)
|
|
self.sendDict({'type':'msg', 'format':'base64', 'string':b})
|
|
self.messageSent.acquire(timeout=2)
|
|
|
|
def disconnect(self):
|
|
self.sendDict({'type': 'disconnect'})
|
|
|
|
def readFromComPort(self):
|
|
while True:
|
|
line = self.ser.readline().rstrip()
|
|
# valid message can't be empty
|
|
if type(line) is not bytes or line == b'':
|
|
# empty message received after a timeout on serial connection, to ignore
|
|
continue
|
|
line = line.decode("utf-8")
|
|
try:
|
|
receivedMsgDict = json.loads(line)
|
|
except json.decoder.JSONDecodeError:
|
|
# this is not a dictionnary, just a debug message
|
|
if self.debug: print('from COM:', line, flush=True)
|
|
continue
|
|
msgType = receivedMsgDict['type']
|
|
if msgType == 'connected':
|
|
self.bleConnected.release()
|
|
elif msgType == 'sentMessage':
|
|
self.messageSent.release()
|
|
elif msgType == 'msgFromBle':
|
|
if receivedMsgDict['format'] == 'str':
|
|
self.onMsgReceived(receivedMsgDict['string'])
|
|
else:
|
|
if self.debug: print('base64 msg from BLE:', len(receivedMsgDict['string']), receivedMsgDict['string'])
|
|
#self.onMsgReceived(binascii.a2b_base64(receivedMsgDict['string']))
|
|
self.onMsgReceived(base64.b64decode(receivedMsgDict['string']))
|
|
|
|
elif msgType == 'debug':
|
|
if self.debug:
|
|
del(receivedMsgDict['type'])
|
|
print('debug COM:', receivedMsgDict)
|
|
elif msgType in ['connect', 'msg']:
|
|
pass
|
|
else:
|
|
print('unknown msg type', receivedMsgDict)
|