24H_du_code_2026/ble/RobotBleServer.py
2026-03-22 00:26:24 +01:00

114 lines
3.8 KiB
Python

# to know COM port used when connected on PC:
# python -m serial.tools.list_ports
import binascii
import sys
sys.path.append("")
from micropython import const
import aioble
import bluetooth
import struct
_SERVICE_UUID = bluetooth.UUID(0x1234)
_CHAR_UUID = bluetooth.UUID(0x1235)
# How frequently to send advertising beacons.
_ADV_INTERVAL_MS = 250_000
MAX_MSG_DATA_LENGTH = const(18)
_COMMAND_DONE = const(0)
_COMMAND_SENDDATA = const(1)
_COMMAND_SENDCHUNK = const(2) # send chunk of string, use _COMMAND_SENDDATA for last chunk
_COMMAND_SENDBYTESDATA = const(3)
_COMMAND_SENDBYTESCHUNK = const(4) # send chunk of string base64 formatted, use _COMMAND_SENDBYTESDATA for last chunk
class RobotBleServer:
"""Class to manage connection with BLE"""
def __init__(self, robotName:str, onMsgReceived):
""":param robotName: name to use in advertising
:param onMsgReceived: function to call when a message is received"""
self.robotName = robotName
self.onMsgReceived = onMsgReceived
# Register GATT server.
service = aioble.Service(_SERVICE_UUID)
self.characteristic = aioble.Characteristic(service, _CHAR_UUID, write=True, notify=True)
aioble.register_services(service)
self.connection = None
def sendMessage(self, msg:str|bytes):
"""Send a message over BLE
Message can be a string or a bytes sequence (maximum 18 charaters/bytes per message)
:param msg: message to send"""
if type(msg) == str:
encodedMsg = msg.encode()
sendMsgType, sendChunkMsgType = _COMMAND_SENDDATA, _COMMAND_SENDCHUNK
elif type(msg) == bytes:
#msg = binascii.b2a_base64(msg).encode()
encodedMsg = binascii.b2a_base64(msg).rstrip()
sendMsgType, sendChunkMsgType = _COMMAND_SENDBYTESDATA, _COMMAND_SENDBYTESCHUNK
else:
raise Exception('unsupported message type', type(msg))
print('encode', type(msg), msg, '=>', encodedMsg)
while len(encodedMsg) > MAX_MSG_DATA_LENGTH:
chunk = encodedMsg[:MAX_MSG_DATA_LENGTH]
self.characteristic.notify(self.connection, struct.pack("<B", sendChunkMsgType) + chunk)
encodedMsg = encodedMsg[MAX_MSG_DATA_LENGTH:]
print('sent chunk', chunk)
self.characteristic.notify(self.connection, struct.pack("<B", sendMsgType) + encodedMsg)
print('sent last', encodedMsg)
async def bleTask(self):
"""Loop to wait for incoming messages over BLE.
When a received message is complete, call function defined in self.onMsgReceived
When BLE connection is closed, stop this function"""
try:
with self.connection.timeout(None):
dataChunk = ''
msgId = 0
while True:
await self.characteristic.written()
msg = self.characteristic.read()
#self.characteristic.write(b"")
if len(msg) < 3:
continue
# Message is <command><seq><data>.
command = msg[0]
op_seq = int(msg[1])
msgData = msg[2:].decode()
#print('MSG=', msg)
if command in (_COMMAND_SENDCHUNK, _COMMAND_SENDBYTESCHUNK):
dataChunk += msgData
print('received chunk', msgData, '=>', dataChunk)
elif command in (_COMMAND_SENDDATA, _COMMAND_SENDBYTESDATA):
data = dataChunk + msgData
dataChunk = ''
if command == _COMMAND_SENDBYTESDATA:
data = binascii.a2b_base64(data)
#print('received data:', data)
print('received:', len(data), msgId, type(data), data)
self.onMsgReceived(data)
msgId += 1
except aioble.DeviceDisconnectedError:
print('disconnected BLE')
return
async def communicationTask(self):
"""Loop to advertise and wait for connection.
When connection is established, start task to read incoming messages"""
while True:
print("Waiting for connection")
self.connection = await aioble.advertise(
_ADV_INTERVAL_MS,
name=self.robotName,
services=[_SERVICE_UUID],
)
print("Connection from", self.connection.device)
await self.bleTask()
await self.connection.disconnected()
self.connection = None