diff --git a/ble/ComWithDongle.py b/ble/ComWithDongle.py new file mode 100644 index 0000000..d13351d --- /dev/null +++ b/ble/ComWithDongle.py @@ -0,0 +1,95 @@ +# python -m serial.tools.list_ports + +import sys +import time +#import binascii +import base64 +import serial +import threading +import json + +ctrlC = bytes.fromhex("03") +ctrlD = bytes.fromhex("04") + +class ComWithDongle: + """Class to manage communication with dongle, over virtual COM port""" + def __init__(self, comPort:str, peripheralName:str, onMsgReceived, debug=False): + """:param comPort: name of COM port used by dongle + :param peripheralName: name of BLE peripheral + :param onMsgReceived: function to call when a message from peripheral is received + :param debug: when True, print debug messages received from dongle""" + try: + self.ser = serial.Serial(port=comPort, baudrate=115200, timeout=2) + except serial.SerialException: + exit(f"no device on port {comPort}") + self.bleConnected = threading.Semaphore(0) + self.messageSent = threading.Semaphore(0) + self.onMsgReceived = onMsgReceived + self.debug = debug + self.resetDongle() + threading.Thread(name='readComPort', target=self.readFromComPort, daemon=True).start() + # first message over COM port to dongle is to define BLE peripheral to connect on + self.sendDict({'type':'connect','name':peripheralName}) + timeoutNotReached = self.bleConnected.acquire(timeout=5) + if not timeoutNotReached: + exit(f'unable to connect to peripheral "{peripheralName}"') + + def resetDongle(self): + self.ser.write(ctrlC) + self.ser.write(ctrlD) + self.ser.flush() + time.sleep(2) + + def sendDict(self, msg:dict): + self.ser.write(json.dumps(msg).encode("utf-8") + b'\r') + + def sendMsg(self, msg:str|bytes): + if isinstance(msg, str): + self.sendDict({'type':'msg', 'format':'str', 'string':msg}) + else: + #self.sendDict({'type':'msg', 'base64':binascii.b2a_base64(msg).decode("utf-8")}) + #b = binascii.b2a_base64(msg).decode("utf-8").rstrip() + #b = base64.b64encode(msg).decode("utf-8").rstrip() + b = base64.b64encode(msg).decode("utf-8").rstrip() + if self.debug: print('sendMsg', msg, '=>', b, flush=True) + self.sendDict({'type':'msg', 'format':'base64', 'string':b}) + self.messageSent.acquire(timeout=2) + + def disconnect(self): + self.sendDict({'type': 'disconnect'}) + + def readFromComPort(self): + while True: + line = self.ser.readline().rstrip() + # valid message can't be empty + if type(line) is not bytes or line == b'': + # empty message received after a timeout on serial connection, to ignore + continue + line = line.decode("utf-8") + try: + receivedMsgDict = json.loads(line) + except json.decoder.JSONDecodeError: + # this is not a dictionnary, just a debug message + if self.debug: print('from COM:', line, flush=True) + continue + msgType = receivedMsgDict['type'] + if msgType == 'connected': + self.bleConnected.release() + elif msgType == 'sentMessage': + self.messageSent.release() + elif msgType == 'msgFromBle': + if receivedMsgDict['format'] == 'str': + self.onMsgReceived(receivedMsgDict['string']) + else: + if self.debug: print('base64 msg from BLE:', len(receivedMsgDict['string']), receivedMsgDict['string']) + #self.onMsgReceived(binascii.a2b_base64(receivedMsgDict['string'])) + self.onMsgReceived(base64.b64decode(receivedMsgDict['string'])) + + elif msgType == 'debug': + if self.debug: + del(receivedMsgDict['type']) + print('debug COM:', receivedMsgDict) + elif msgType in ['connect', 'msg']: + pass + else: + print('unknown msg type', receivedMsgDict) diff --git a/ble/README.md b/ble/README.md new file mode 100644 index 0000000..5bbdea5 --- /dev/null +++ b/ble/README.md @@ -0,0 +1,48 @@ +# BLE example +Here is an example of driver to send messages using BLE + +# Setup on robot (or other BLE advertiser) +Copy following files to robot + +- aioble/\* +- RobotBleServer.py +- mainRobotTestBLE.py (to rename as main.py) + +You can use script toRobot.sh for that, for example when run from a Windows git bash, +if robot is connected on drive D:, you can run +> ./toRobot.sh /d + +# Setup on USB dongle +Copy following files to robot + +- aioble/\* +- mainDongle.py (to rename as main.py) + +You can use script toDongle.sh for that, for example when run from a Windows git bash, +if dongle is connected on drive E:, you can run +> ./toDongle.sh /e + +# Setup on computer +You need pyserial module for python. You can install it using command + +> python -m pip install pyserial + +or if a proxy is required +> python -m pip install --proxy \ pyserial + +Then run following command +> python mainPcTestBLE.py --portcom \ + +To know COM port to use as argument, run following command before and after dongle connection: +> python -m serial.tools.list_ports + +Port in second result but not in first result is port used by dongle. + +# Connect on the good robot +When several robots are started at same time, they shall have a unique identifier so you can connect over BLE on the good robot. +For that, you shall replace "myTeamName" by a unique identifer (for example the name of your team) in following files: +- mainRobotTestBLE.py +- mainPcTestBLE.py + +# Note relative to BLE +The Bluetooth is a connection with a limited transfer rate. If you try to send a lot of messages in a short period of time, or transfer long messages, the BLE driver will do it's best to transfer all data but expect delay to receive messages on the other side. diff --git a/ble/RobotBleServer.py b/ble/RobotBleServer.py new file mode 100644 index 0000000..0dc56ce --- /dev/null +++ b/ble/RobotBleServer.py @@ -0,0 +1,113 @@ +# to know COM port used when connected on PC: +# python -m serial.tools.list_ports + +import binascii +import sys +sys.path.append("") +from micropython import const +import aioble +import bluetooth +import struct + +_SERVICE_UUID = bluetooth.UUID(0x1234) +_CHAR_UUID = bluetooth.UUID(0x1235) + +# How frequently to send advertising beacons. +_ADV_INTERVAL_MS = 250_000 + +MAX_MSG_DATA_LENGTH = const(18) + +_COMMAND_DONE = const(0) +_COMMAND_SENDDATA = const(1) +_COMMAND_SENDCHUNK = const(2) # send chunk of string, use _COMMAND_SENDDATA for last chunk +_COMMAND_SENDBYTESDATA = const(3) +_COMMAND_SENDBYTESCHUNK = const(4) # send chunk of string base64 formatted, use _COMMAND_SENDBYTESDATA for last chunk + +class RobotBleServer: + """Class to manage connection with BLE""" + def __init__(self, robotName:str, onMsgReceived): + """:param robotName: name to use in advertising + :param onMsgReceived: function to call when a message is received""" + self.robotName = robotName + self.onMsgReceived = onMsgReceived + # Register GATT server. + service = aioble.Service(_SERVICE_UUID) + self.characteristic = aioble.Characteristic(service, _CHAR_UUID, write=True, notify=True) + aioble.register_services(service) + self.connection = None + + def sendMessage(self, msg:str|bytes): + """Send a message over BLE + Message can be a string or a bytes sequence (maximum 18 charaters/bytes per message) + :param msg: message to send""" + if type(msg) == str: + encodedMsg = msg.encode() + sendMsgType, sendChunkMsgType = _COMMAND_SENDDATA, _COMMAND_SENDCHUNK + elif type(msg) == bytes: + #msg = binascii.b2a_base64(msg).encode() + encodedMsg = binascii.b2a_base64(msg).rstrip() + sendMsgType, sendChunkMsgType = _COMMAND_SENDBYTESDATA, _COMMAND_SENDBYTESCHUNK + else: + raise Exception('unsupported message type', type(msg)) + print('encode', type(msg), msg, '=>', encodedMsg) + while len(encodedMsg) > MAX_MSG_DATA_LENGTH: + chunk = encodedMsg[:MAX_MSG_DATA_LENGTH] + self.characteristic.notify(self.connection, struct.pack(". + command = msg[0] + op_seq = int(msg[1]) + msgData = msg[2:].decode() + #print('MSG=', msg) + + if command in (_COMMAND_SENDCHUNK, _COMMAND_SENDBYTESCHUNK): + dataChunk += msgData + print('received chunk', msgData, '=>', dataChunk) + elif command in (_COMMAND_SENDDATA, _COMMAND_SENDBYTESDATA): + data = dataChunk + msgData + dataChunk = '' + if command == _COMMAND_SENDBYTESDATA: + data = binascii.a2b_base64(data) + #print('received data:', data) + print('received:', len(data), msgId, type(data), data) + self.onMsgReceived(data) + msgId += 1 + except aioble.DeviceDisconnectedError: + print('disconnected BLE') + return + + async def communicationTask(self): + """Loop to advertise and wait for connection. + When connection is established, start task to read incoming messages""" + while True: + print("Waiting for connection") + self.connection = await aioble.advertise( + _ADV_INTERVAL_MS, + name=self.robotName, + services=[_SERVICE_UUID], + ) + print("Connection from", self.connection.device) + await self.bleTask() + await self.connection.disconnected() + self.connection = None + diff --git a/ble/aioble/__init__.py b/ble/aioble/__init__.py new file mode 100644 index 0000000..dde89f5 --- /dev/null +++ b/ble/aioble/__init__.py @@ -0,0 +1,32 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +from .device import Device, DeviceDisconnectedError +from .core import log_info, log_warn, log_error, GattError, config, stop + +try: + from .peripheral import advertise +except: + log_info("Peripheral support disabled") + +try: + from .central import scan +except: + log_info("Central support disabled") + +try: + from .server import ( + Service, + Characteristic, + BufferedCharacteristic, + Descriptor, + register_services, + ) +except: + log_info("GATT server support disabled") + + +ADDR_PUBLIC = const(0) +ADDR_RANDOM = const(1) diff --git a/ble/aioble/central.py b/ble/aioble/central.py new file mode 100644 index 0000000..adfc972 --- /dev/null +++ b/ble/aioble/central.py @@ -0,0 +1,297 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import bluetooth +import struct + +import uasyncio as asyncio + +from .core import ( + ensure_active, + ble, + log_info, + log_error, + log_warn, + register_irq_handler, +) +from .device import Device, DeviceConnection, DeviceTimeout + + +_IRQ_SCAN_RESULT = const(5) +_IRQ_SCAN_DONE = const(6) + +_IRQ_PERIPHERAL_CONNECT = const(7) +_IRQ_PERIPHERAL_DISCONNECT = const(8) + +_ADV_IND = const(0) +_ADV_DIRECT_IND = const(1) +_ADV_SCAN_IND = const(2) +_ADV_NONCONN_IND = const(3) +_SCAN_RSP = const(4) + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_SHORT_NAME = const(0x08) +_ADV_TYPE_UUID16_INCOMPLETE = const(0x2) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_UUID32_INCOMPLETE = const(0x4) +_ADV_TYPE_UUID32_COMPLETE = const(0x5) +_ADV_TYPE_UUID128_INCOMPLETE = const(0x6) +_ADV_TYPE_UUID128_COMPLETE = const(0x7) +_ADV_TYPE_APPEARANCE = const(0x19) +_ADV_TYPE_MANUFACTURER = const(0xFF) + + +# Keep track of the active scanner so IRQs can be delivered to it. +_active_scanner = None + + +# Set of devices that are waiting for the peripheral connect IRQ. +_connecting = set() + + +def _central_irq(event, data): + # Send results and done events to the active scanner instance. + if event == _IRQ_SCAN_RESULT: + addr_type, addr, adv_type, rssi, adv_data = data + if not _active_scanner: + return + _active_scanner._queue.append((addr_type, bytes(addr), adv_type, rssi, bytes(adv_data))) + _active_scanner._event.set() + elif event == _IRQ_SCAN_DONE: + if not _active_scanner: + return + _active_scanner._done = True + _active_scanner._event.set() + + # Peripheral connect must be in response to a pending connection, so find + # it in the pending connection set. + elif event == _IRQ_PERIPHERAL_CONNECT: + conn_handle, addr_type, addr = data + + for d in _connecting: + if d.addr_type == addr_type and d.addr == addr: + # Allow connect() to complete. + connection = d._connection + connection._conn_handle = conn_handle + connection._event.set() + break + + # Find the active device connection for this connection handle. + elif event == _IRQ_PERIPHERAL_DISCONNECT: + conn_handle, _, _ = data + if connection := DeviceConnection._connected.get(conn_handle, None): + # Tell the device_task that it should terminate. + connection._event.set() + + +def _central_shutdown(): + global _active_scanner, _connecting + _active_scanner = None + _connecting = set() + + +register_irq_handler(_central_irq, _central_shutdown) + + +# Cancel an in-progress scan. +async def _cancel_pending(): + if _active_scanner: + await _active_scanner.cancel() + + +# Start connecting to a peripheral. +# Call device.connect() rather than using method directly. +async def _connect(connection, timeout_ms): + device = connection.device + if device in _connecting: + return + + # Enable BLE and cancel in-progress scans. + ensure_active() + await _cancel_pending() + + # Allow the connected IRQ to find the device by address. + _connecting.add(device) + + # Event will be set in the connected IRQ, and then later + # re-used to notify disconnection. + connection._event = connection._event or asyncio.ThreadSafeFlag() + + try: + with DeviceTimeout(None, timeout_ms): + ble.gap_connect(device.addr_type, device.addr) + + # Wait for the connected IRQ. + await connection._event.wait() + assert connection._conn_handle is not None + + # Register connection handle -> device. + DeviceConnection._connected[connection._conn_handle] = connection + finally: + # After timeout, don't hold a reference and ignore future events. + _connecting.remove(device) + + +# Represents a single device that has been found during a scan. The scan +# iterator will return the same ScanResult instance multiple times as its data +# changes (i.e. changing RSSI or advertising data). +class ScanResult: + def __init__(self, device): + self.device = device + self.adv_data = None + self.resp_data = None + self.rssi = None + self.connectable = False + + # New scan result available, return true if it changes our state. + def _update(self, adv_type, rssi, adv_data): + updated = False + + if rssi != self.rssi: + self.rssi = rssi + updated = True + + if adv_type in (_ADV_IND, _ADV_NONCONN_IND): + if adv_data != self.adv_data: + self.adv_data = adv_data + self.connectable = adv_type == _ADV_IND + updated = True + elif adv_type == _ADV_SCAN_IND: + if adv_data != self.adv_data and self.resp_data: + updated = True + self.adv_data = adv_data + elif adv_type == _SCAN_RSP and adv_data: + if adv_data != self.resp_data: + self.resp_data = adv_data + updated = True + + return updated + + def __str__(self): + return "Scan result: {} {}".format(self.device, self.rssi) + + # Gets all the fields for the specified types. + def _decode_field(self, *adv_type): + # Advertising payloads are repeated packets of the following form: + # 1 byte data length (N + 1) + # 1 byte type (see constants below) + # N bytes type-specific data + for payload in (self.adv_data, self.resp_data): + if not payload: + continue + i = 0 + while i + 1 < len(payload): + if payload[i + 1] in adv_type: + yield payload[i + 2 : i + payload[i] + 1] + i += 1 + payload[i] + + # Returns the value of the complete (or shortened) advertised name, if available. + def name(self): + for n in self._decode_field(_ADV_TYPE_NAME, _ADV_TYPE_SHORT_NAME): + return str(n, "utf-8") if n else "" + + # Generator that enumerates the service UUIDs that are advertised. + def services(self): + for u in self._decode_field(_ADV_TYPE_UUID16_INCOMPLETE, _ADV_TYPE_UUID16_COMPLETE): + yield bluetooth.UUID(struct.unpack(" value_handle else value_handle + 2 + + super().__init__(value_handle, properties, uuid) + + if properties & _FLAG_NOTIFY: + # Fired when a notification arrives. + self._notify_event = asyncio.ThreadSafeFlag() + # Data for the most recent notification. + self._notify_queue = deque((), 1) + if properties & _FLAG_INDICATE: + # Same for indications. + self._indicate_event = asyncio.ThreadSafeFlag() + self._indicate_queue = deque((), 1) + + def __str__(self): + return "Characteristic: {} {} {} {}".format( + self._end_handle, self._value_handle, self.properties, self.uuid + ) + + def _connection(self): + return self.service.connection + + # Search for a specific descriptor by uuid. + async def descriptor(self, uuid, timeout_ms=2000): + result = None + # Make sure loop runs to completion. + async for descriptor in self.descriptors(timeout_ms): + if not result and descriptor.uuid == uuid: + # Keep first result. + result = descriptor + return result + + # Search for all services (optionally by uuid). + # Use with `async for`, e.g. + # async for descriptor in characteristic.descriptors(): + # Note: must allow the loop to run to completion. + def descriptors(self, timeout_ms=2000): + return ClientDiscover(self.connection, ClientDescriptor, self, timeout_ms) + + # For ClientDiscover + def _start_discovery(service, uuid=None): + ble.gattc_discover_characteristics( + service.connection._conn_handle, + service._start_handle, + service._end_handle, + uuid, + ) + + # Helper for notified() and indicated(). + async def _notified_indicated(self, queue, event, timeout_ms): + # Ensure that events for this connection can route to this characteristic. + self._register_with_connection() + + # If the queue is empty, then we need to wait. However, if the queue + # has a single item, we also need to do a no-op wait in order to + # clear the event flag (because the queue will become empty and + # therefore the event should be cleared). + if len(queue) <= 1: + with self._connection().timeout(timeout_ms): + await event.wait() + + # Either we started > 1 item, or the wait completed successfully, return + # the front of the queue. + return queue.popleft() + + # Wait for the next notification. + # Will return immediately if a notification has already been received. + async def notified(self, timeout_ms=None): + self._check(_FLAG_NOTIFY) + return await self._notified_indicated(self._notify_queue, self._notify_event, timeout_ms) + + def _on_notify_indicate(self, queue, event, data): + # If we've gone from empty to one item, then wake something + # blocking on `await char.notified()` (or `await char.indicated()`). + wake = len(queue) == 0 + # Append the data. By default this is a deque with max-length==1, so it + # replaces. But if capture is enabled then it will append. + queue.append(data) + if wake: + # Queue is now non-empty. If something is waiting, it will be + # worken. If something isn't waiting right now, then a future + # caller to `await char.written()` will see the queue is + # non-empty, and wait on the event if it's going to empty the + # queue. + event.set() + + # Map an incoming notify IRQ to a registered characteristic. + def _on_notify(conn_handle, value_handle, notify_data): + if characteristic := ClientCharacteristic._find(conn_handle, value_handle): + characteristic._on_notify_indicate( + characteristic._notify_queue, characteristic._notify_event, notify_data + ) + + # Wait for the next indication. + # Will return immediately if an indication has already been received. + async def indicated(self, timeout_ms=None): + self._check(_FLAG_INDICATE) + return await self._notified_indicated( + self._indicate_queue, self._indicate_event, timeout_ms + ) + + # Map an incoming indicate IRQ to a registered characteristic. + def _on_indicate(conn_handle, value_handle, indicate_data): + if characteristic := ClientCharacteristic._find(conn_handle, value_handle): + characteristic._on_notify_indicate( + characteristic._indicate_queue, characteristic._indicate_event, indicate_data + ) + + # Write to the Client Characteristic Configuration to subscribe to + # notify/indications for this characteristic. + async def subscribe(self, notify=True, indicate=False): + # Ensure that the generated notifications are dispatched in case the app + # hasn't awaited on notified/indicated yet. + self._register_with_connection() + if cccd := await self.descriptor(bluetooth.UUID(_CCCD_UUID)): + await cccd.write(struct.pack(" 0: + print("[aioble] E:", *args) + + +def log_warn(*args): + if log_level > 1: + print("[aioble] W:", *args) + + +def log_info(*args): + if log_level > 2: + print("[aioble] I:", *args) + + +class GattError(Exception): + def __init__(self, status): + self._status = status + + +def ensure_active(): + if not ble.active(): + try: + from .security import load_secrets + + load_secrets() + except: + pass + ble.active(True) + + +def config(*args, **kwargs): + ensure_active() + return ble.config(*args, **kwargs) + + +# Because different functionality is enabled by which files are available the +# different modules can register their IRQ handlers and shutdown handlers +# dynamically. +_irq_handlers = [] +_shutdown_handlers = [] + + +def register_irq_handler(irq, shutdown): + if irq: + _irq_handlers.append(irq) + if shutdown: + _shutdown_handlers.append(shutdown) + + +def stop(): + ble.active(False) + for handler in _shutdown_handlers: + handler() + + +# Dispatch IRQs to the registered sub-modules. +def ble_irq(event, data): + log_info(event, data) + + for handler in _irq_handlers: + result = handler(event, data) + if result is not None: + return result + + +# TODO: Allow this to be injected. +ble = bluetooth.BLE() +ble.irq(ble_irq) diff --git a/ble/aioble/device.py b/ble/aioble/device.py new file mode 100644 index 0000000..265d621 --- /dev/null +++ b/ble/aioble/device.py @@ -0,0 +1,295 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import uasyncio as asyncio +import binascii + +from .core import ble, register_irq_handler, log_error + + +_IRQ_MTU_EXCHANGED = const(21) + + +# Raised by `with device.timeout()`. +class DeviceDisconnectedError(Exception): + pass + + +def _device_irq(event, data): + if event == _IRQ_MTU_EXCHANGED: + conn_handle, mtu = data + if device := DeviceConnection._connected.get(conn_handle, None): + device.mtu = mtu + if device._mtu_event: + device._mtu_event.set() + + +register_irq_handler(_device_irq, None) + + +# Context manager to allow an operation to be cancelled by timeout or device +# disconnection. Don't use this directly -- use `with connection.timeout(ms):` +# instead. +class DeviceTimeout: + def __init__(self, connection, timeout_ms): + self._connection = connection + self._timeout_ms = timeout_ms + + # We allow either (or both) connection and timeout_ms to be None. This + # allows this to be used either as a just-disconnect, just-timeout, or + # no-op. + + # This task is active while the operation is in progress. It sleeps + # until the timeout, and then cancels the working task. If the working + # task completes, __exit__ will cancel the sleep. + self._timeout_task = None + + # This is the task waiting for the actual operation to complete. + # Usually this is waiting on an event that will be set() by an IRQ + # handler. + self._task = asyncio.current_task() + + # Tell the connection that if it disconnects, it should cancel this + # operation (by cancelling self._task). + if connection: + connection._timeouts.append(self) + + async def _timeout_sleep(self): + try: + await asyncio.sleep_ms(self._timeout_ms) + except asyncio.CancelledError: + # The operation completed successfully and this timeout task was + # cancelled by __exit__. + return + + # The sleep completed, so we should trigger the timeout. Set + # self._timeout_task to None so that we can tell the difference + # between a disconnect and a timeout in __exit__. + self._timeout_task = None + self._task.cancel() + + def __enter__(self): + if self._timeout_ms: + # Schedule the timeout waiter. + self._timeout_task = asyncio.create_task(self._timeout_sleep()) + + def __exit__(self, exc_type, exc_val, exc_traceback): + # One of five things happened: + # 1 - The operation completed successfully. + # 2 - The operation timed out. + # 3 - The device disconnected. + # 4 - The operation failed for a different exception. + # 5 - The task was cancelled by something else. + + # Don't need the connection to tell us about disconnection anymore. + if self._connection: + self._connection._timeouts.remove(self) + + try: + if exc_type == asyncio.CancelledError: + # Case 2, we started a timeout and it's completed. + if self._timeout_ms and self._timeout_task is None: + raise asyncio.TimeoutError + + # Case 3, we have a disconnected device. + if self._connection and self._connection._conn_handle is None: + raise DeviceDisconnectedError + + # Case 5, something else cancelled us. + # Allow the cancellation to propagate. + return + + # Case 1 & 4. Either way, just stop the timeout task and let the + # exception (if case 4) propagate. + finally: + # In all cases, if the timeout is still running, cancel it. + if self._timeout_task: + self._timeout_task.cancel() + + +class Device: + def __init__(self, addr_type, addr): + # Public properties + self.addr_type = addr_type + self.addr = addr if len(addr) == 6 else binascii.unhexlify(addr.replace(":", "")) + self._connection = None + + def __eq__(self, rhs): + return self.addr_type == rhs.addr_type and self.addr == rhs.addr + + def __hash__(self): + return hash((self.addr_type, self.addr)) + + def __str__(self): + return "Device({}, {}{})".format( + "ADDR_PUBLIC" if self.addr_type == 0 else "ADDR_RANDOM", + self.addr_hex(), + ", CONNECTED" if self._connection else "", + ) + + def addr_hex(self): + return binascii.hexlify(self.addr, ":").decode() + + async def connect(self, timeout_ms=10000): + if self._connection: + return self._connection + + # Forward to implementation in central.py. + from .central import _connect + + await _connect(DeviceConnection(self), timeout_ms) + + # Start the device task that will clean up after disconnection. + self._connection._run_task() + return self._connection + + +class DeviceConnection: + # Global map of connection handle to active devices (for IRQ mapping). + _connected = {} + + def __init__(self, device): + self.device = device + device._connection = self + + self.encrypted = False + self.authenticated = False + self.bonded = False + self.key_size = False + self.mtu = None + + self._conn_handle = None + + # This event is fired by the IRQ both for connection and disconnection + # and controls the device_task. + self._event = None + + # If we're waiting for a pending MTU exchange. + self._mtu_event = None + + # In-progress client discovery instance (e.g. services, chars, + # descriptors) used for IRQ mapping. + self._discover = None + # Map of value handle to characteristic (so that IRQs with + # conn_handle,value_handle can route to them). See + # ClientCharacteristic._find for where this is used. + self._characteristics = {} + + self._task = None + + # DeviceTimeout instances that are currently waiting on this device + # and need to be notified if disconnection occurs. + self._timeouts = [] + + # Fired by the encryption update event. + self._pair_event = None + + # Active L2CAP channel for this device. + # TODO: Support more than one concurrent channel. + self._l2cap_channel = None + + # While connected, this tasks waits for disconnection then cleans up. + async def device_task(self): + assert self._conn_handle is not None + + # Wait for the (either central or peripheral) disconnected irq. + await self._event.wait() + + # Mark the device as disconnected. + del DeviceConnection._connected[self._conn_handle] + self._conn_handle = None + self.device._connection = None + + # Cancel any in-progress operations on this device. + for t in self._timeouts: + t._task.cancel() + + def _run_task(self): + # Event will be already created this if we initiated connection. + self._event = self._event or asyncio.ThreadSafeFlag() + + self._task = asyncio.create_task(self.device_task()) + + async def disconnect(self, timeout_ms=2000): + await self.disconnected(timeout_ms, disconnect=True) + + async def disconnected(self, timeout_ms=60000, disconnect=False): + if not self.is_connected(): + return + + # The task must have been created after successful connection. + assert self._task + + if disconnect: + try: + ble.gap_disconnect(self._conn_handle) + except OSError as e: + log_error("Disconnect", e) + + with DeviceTimeout(None, timeout_ms): + await self._task + + # Retrieve a single service matching this uuid. + async def service(self, uuid, timeout_ms=2000): + result = None + # Make sure loop runs to completion. + async for service in self.services(uuid, timeout_ms): + if not result and service.uuid == uuid: + result = service + return result + + # Search for all services (optionally by uuid). + # Use with `async for`, e.g. + # async for service in device.services(): + # Note: must allow the loop to run to completion. + # TODO: disconnection / timeout + def services(self, uuid=None, timeout_ms=2000): + from .client import ClientDiscover, ClientService + + return ClientDiscover(self, ClientService, self, timeout_ms, uuid) + + async def pair(self, *args, **kwargs): + from .security import pair + + await pair(self, *args, **kwargs) + + def is_connected(self): + return self._conn_handle is not None + + # Use with `with` to simplify disconnection and timeout handling. + def timeout(self, timeout_ms): + return DeviceTimeout(self, timeout_ms) + + async def exchange_mtu(self, mtu=None, timeout_ms=1000): + if not self.is_connected(): + raise ValueError("Not connected") + + if mtu: + ble.config(mtu=mtu) + + self._mtu_event = self._mtu_event or asyncio.ThreadSafeFlag() + ble.gattc_exchange_mtu(self._conn_handle) + with self.timeout(timeout_ms): + await self._mtu_event.wait() + return self.mtu + + # Wait for a connection on an L2CAP connection-oriented-channel. + async def l2cap_accept(self, psm, mtu, timeout_ms=None): + from .l2cap import accept + + return await accept(self, psm, mtu, timeout_ms) + + # Attempt to connect to a listening device. + async def l2cap_connect(self, psm, mtu, timeout_ms=1000): + from .l2cap import connect + + return await connect(self, psm, mtu, timeout_ms) + + # Context manager -- automatically disconnect. + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_traceback): + await self.disconnect() diff --git a/ble/aioble/l2cap.py b/ble/aioble/l2cap.py new file mode 100644 index 0000000..713c441 --- /dev/null +++ b/ble/aioble/l2cap.py @@ -0,0 +1,214 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import uasyncio as asyncio + +from .core import ble, log_error, register_irq_handler +from .device import DeviceConnection + + +_IRQ_L2CAP_ACCEPT = const(22) +_IRQ_L2CAP_CONNECT = const(23) +_IRQ_L2CAP_DISCONNECT = const(24) +_IRQ_L2CAP_RECV = const(25) +_IRQ_L2CAP_SEND_READY = const(26) + + +# Once we start listening we're listening forever. (Limitation in NimBLE) +_listening = False + + +def _l2cap_irq(event, data): + if event not in ( + _IRQ_L2CAP_CONNECT, + _IRQ_L2CAP_DISCONNECT, + _IRQ_L2CAP_RECV, + _IRQ_L2CAP_SEND_READY, + ): + return + + # All the L2CAP events start with (conn_handle, cid, ...) + if connection := DeviceConnection._connected.get(data[0], None): + if channel := connection._l2cap_channel: + # Expect to match the cid for this conn handle (unless we're + # waiting for connection in which case channel._cid is None). + if channel._cid is not None and channel._cid != data[1]: + return + + # Update the channel object with new information. + if event == _IRQ_L2CAP_CONNECT: + _, channel._cid, _, channel.our_mtu, channel.peer_mtu = data + elif event == _IRQ_L2CAP_DISCONNECT: + _, _, psm, status = data + channel._status = status + channel._cid = None + connection._l2cap_channel = None + elif event == _IRQ_L2CAP_RECV: + channel._data_ready = True + elif event == _IRQ_L2CAP_SEND_READY: + channel._stalled = False + + # Notify channel. + channel._event.set() + + +def _l2cap_shutdown(): + global _listening + _listening = False + + +register_irq_handler(_l2cap_irq, _l2cap_shutdown) + + +# The channel was disconnected during a send/recvinto/flush. +class L2CAPDisconnectedError(Exception): + pass + + +# Failed to connect to connection (argument is status). +class L2CAPConnectionError(Exception): + pass + + +class L2CAPChannel: + def __init__(self, connection): + if not connection.is_connected(): + raise ValueError("Not connected") + + if connection._l2cap_channel: + raise ValueError("Already has channel") + connection._l2cap_channel = self + + self._connection = connection + + # Maximum size that the other side can send to us. + self.our_mtu = 0 + # Maximum size that we can send. + self.peer_mtu = 0 + + # Set back to None on disconnection. + self._cid = None + # Set during disconnection. + self._status = 0 + + # If true, must wait for _IRQ_L2CAP_SEND_READY IRQ before sending. + self._stalled = False + + # Has received a _IRQ_L2CAP_RECV since the buffer was last emptied. + self._data_ready = False + + self._event = asyncio.ThreadSafeFlag() + + def _assert_connected(self): + if self._cid is None: + raise L2CAPDisconnectedError + + async def recvinto(self, buf, timeout_ms=None): + self._assert_connected() + + # Wait until the data_ready flag is set. This flag is only ever set by + # the event and cleared by this function. + with self._connection.timeout(timeout_ms): + while not self._data_ready: + await self._event.wait() + self._assert_connected() + + self._assert_connected() + + # Extract up to len(buf) bytes from the channel buffer. + n = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, buf) + + # Check if there's still remaining data in the channel buffers. + self._data_ready = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, None) > 0 + + return n + + # Synchronously see if there's data ready. + def available(self): + self._assert_connected() + return self._data_ready + + # Waits until the channel is free and then sends buf. + # If the buffer is larger than the MTU it will be sent in chunks. + async def send(self, buf, timeout_ms=None, chunk_size=None): + self._assert_connected() + offset = 0 + chunk_size = min(self.our_mtu * 2, self.peer_mtu, chunk_size or self.peer_mtu) + mv = memoryview(buf) + while offset < len(buf): + if self._stalled: + await self.flush(timeout_ms) + # l2cap_send returns True if you can send immediately. + self._stalled = not ble.l2cap_send( + self._connection._conn_handle, + self._cid, + mv[offset : offset + chunk_size], + ) + offset += chunk_size + + async def flush(self, timeout_ms=None): + self._assert_connected() + # Wait for the _stalled flag to be cleared by the IRQ. + with self._connection.timeout(timeout_ms): + while self._stalled: + await self._event.wait() + self._assert_connected() + + async def disconnect(self, timeout_ms=1000): + if self._cid is None: + return + + # Wait for the cid to be cleared by the disconnect IRQ. + ble.l2cap_disconnect(self._connection._conn_handle, self._cid) + await self.disconnected(timeout_ms) + + async def disconnected(self, timeout_ms=1000): + with self._connection.timeout(timeout_ms): + while self._cid is not None: + await self._event.wait() + + # Context manager -- automatically disconnect. + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_traceback): + await self.disconnect() + + +# Use connection.l2cap_accept() instead of calling this directly. +async def accept(connection, psm, mtu, timeout_ms): + global _listening + + channel = L2CAPChannel(connection) + + # Start the stack listening if necessary. + if not _listening: + ble.l2cap_listen(psm, mtu) + _listening = True + + # Wait for the connect irq from the remote connection. + with connection.timeout(timeout_ms): + await channel._event.wait() + return channel + + +# Use connection.l2cap_connect() instead of calling this directly. +async def connect(connection, psm, mtu, timeout_ms): + if _listening: + raise ValueError("Can't connect while listening") + + channel = L2CAPChannel(connection) + + with connection.timeout(timeout_ms): + ble.l2cap_connect(connection._conn_handle, psm, mtu) + + # Wait for the connect irq from the remote connection. + # If the connection fails, we get a disconnect event (with status) instead. + await channel._event.wait() + + if channel._cid is not None: + return channel + else: + raise L2CAPConnectionError(channel._status) diff --git a/ble/aioble/peripheral.py b/ble/aioble/peripheral.py new file mode 100644 index 0000000..099f2c5 --- /dev/null +++ b/ble/aioble/peripheral.py @@ -0,0 +1,179 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import bluetooth +import struct + +import uasyncio as asyncio + +from .core import ( + ensure_active, + ble, + log_info, + log_error, + log_warn, + register_irq_handler, +) +from .device import Device, DeviceConnection, DeviceTimeout + + +_IRQ_CENTRAL_CONNECT = const(1) +_IRQ_CENTRAL_DISCONNECT = const(2) + + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_UUID32_COMPLETE = const(0x5) +_ADV_TYPE_UUID128_COMPLETE = const(0x7) +_ADV_TYPE_UUID16_MORE = const(0x2) +_ADV_TYPE_UUID32_MORE = const(0x4) +_ADV_TYPE_UUID128_MORE = const(0x6) +_ADV_TYPE_APPEARANCE = const(0x19) +_ADV_TYPE_MANUFACTURER = const(0xFF) + +_ADV_PAYLOAD_MAX_LEN = const(31) + + +_incoming_connection = None +_connect_event = None + + +def _peripheral_irq(event, data): + global _incoming_connection + + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, addr_type, addr = data + + # Create, initialise, and register the device. + device = Device(addr_type, bytes(addr)) + _incoming_connection = DeviceConnection(device) + _incoming_connection._conn_handle = conn_handle + DeviceConnection._connected[conn_handle] = _incoming_connection + + # Signal advertise() to return the connected device. + _connect_event.set() + + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _ = data + if connection := DeviceConnection._connected.get(conn_handle, None): + # Tell the device_task that it should terminate. + connection._event.set() + + +def _peripheral_shutdown(): + global _incoming_connection, _connect_event + _incoming_connection = None + _connect_event = None + + +register_irq_handler(_peripheral_irq, _peripheral_shutdown) + + +# Advertising payloads are repeated packets of the following form: +# 1 byte data length (N + 1) +# 1 byte type (see constants below) +# N bytes type-specific data +def _append(adv_data, resp_data, adv_type, value): + data = struct.pack("BB", len(value) + 1, adv_type) + value + + if len(data) + len(adv_data) < _ADV_PAYLOAD_MAX_LEN: + adv_data += data + return resp_data + + if len(data) + (len(resp_data) if resp_data else 0) < _ADV_PAYLOAD_MAX_LEN: + if not resp_data: + # Overflow into resp_data for the first time. + resp_data = bytearray() + resp_data += data + return resp_data + + raise ValueError("Advertising payload too long") + + +async def advertise( + interval_us, + adv_data=None, + resp_data=None, + connectable=True, + limited_disc=False, + br_edr=False, + name=None, + services=None, + appearance=0, + manufacturer=None, + timeout_ms=None, +): + global _incoming_connection, _connect_event + + ensure_active() + + if not adv_data and not resp_data: + # If the user didn't manually specify adv_data / resp_data then + # construct them from the kwargs. Keep adding fields to adv_data, + # overflowing to resp_data if necessary. + # TODO: Try and do better bin-packing than just concatenating in + # order? + + adv_data = bytearray() + + resp_data = _append( + adv_data, + resp_data, + _ADV_TYPE_FLAGS, + struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), + ) + + # Services are prioritised to go in the advertising data because iOS supports + # filtering scan results by service only, so services must come first. + if services: + for uuid in services: + b = bytes(uuid) + if len(b) == 2: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID16_COMPLETE, b) + elif len(b) == 4: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID32_COMPLETE, b) + elif len(b) == 16: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID128_COMPLETE, b) + + if name: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_NAME, name) + + if appearance: + # See org.bluetooth.characteristic.gap.appearance.xml + resp_data = _append( + adv_data, resp_data, _ADV_TYPE_APPEARANCE, struct.pack(" + cmd = read[0] + #print('received from BLE', read) + self.sendDictToCom({'type':'debug', 'from':'fromBle','cmd':cmd, 'data':read[1:]}) + if cmd in [_COMMAND_SENDCHUNK, _COMMAND_SENDBYTESCHUNK]: + #self.sendDictToCom({'type':'debug', 'from':'chunkFromBle','string':msgChunk}) + msgChunk += read[1:].decode() + elif cmd in [_COMMAND_SENDDATA, _COMMAND_SENDBYTESDATA]: + # message to send to computer over COM port + msgFormat = 'base64' if cmd == _COMMAND_SENDBYTESDATA else 'str' + msg = msgChunk + read[1:].decode() + self.sendDictToCom({'type':'msgFromBle', 'format':msgFormat, 'string':msg}) + msgChunk = '' + + async def sendData(self, data:str, base64:bool=False): + """Send a string or bytes sequence over BLE + :param data: string to send (plain str or base64 formated) + :param base64: if True, data is a base64 formated string""" + sendMsgType = _COMMAND_SENDBYTESCHUNK if base64 else _COMMAND_SENDCHUNK + while len(data) > MAX_MSG_DATA_LENGTH: + chunk = data[:MAX_MSG_DATA_LENGTH] + self.sendDictToCom({'type':'debug', 'from':'sendChunkToBle','string':chunk}) + await self._command(sendMsgType, chunk.encode()) + data = data[MAX_MSG_DATA_LENGTH:] + sendMsgType = _COMMAND_SENDBYTESDATA if base64 else _COMMAND_SENDDATA + #self.sendDictToCom({'type':'msgType', 'strOrBase64':sendMsgType, 'sentdata':data}) + await self._command(sendMsgType, data.encode()) + self.sendDictToCom({'type':'sentMessage'}) + + async def disconnect(self): + if self._connection: + await self._connection.disconnect() + + def sendDictToCom(self, data:dict): + print(json.dumps(data)) + +async def main(): + print('start dongle') + while True: + try: + line = input() + except KeyboardInterrupt: + # when ctrl-C is sent to dongle, we receive a KeyboardInterrupt + sys.exit(0) + #print('dongle received:', line) + receivedMsgDict = json.loads(line) + if receivedMsgDict['type'] == 'connect': + # => start BLE scan and connect on this peripheral + peripheralName = receivedMsgDict['name'] + async with aioble.scan(5000, 30000, 30000, active=True) as scanner: + async for result in scanner: + # print('scan', result.name(), result.services()) + print('scan', result.name(), result.rssi, result.services()) + if result.name() == peripheralName and _SERVICE_UUID in result.services(): + device = result.device + break + else: + print("Server not found") + return + + client = ManageDongle(device) + await client.connect() + elif receivedMsgDict['type'] == 'disconnect': + await client.disconnect() + elif receivedMsgDict['type'] == 'msg': + #msgFormat = 'base64' in receivedMsgDict + if 'format' not in receivedMsgDict or receivedMsgDict['format'] not in ['str', 'base64']: + client.sendDictToCom({'type':'badMessage', 'error':'invalid format', 'received':receivedMsgDict}) + continue + msgFormat = True if receivedMsgDict['format'] == 'base64' else False + await client.sendData(receivedMsgDict['string'], base64=msgFormat) + else: + print('unknown message type', receivedMsgDict) + await client.disconnect() + +asyncio.run(main()) diff --git a/ble/mainPcTestBLE.py b/ble/mainPcTestBLE.py new file mode 100644 index 0000000..641d986 --- /dev/null +++ b/ble/mainPcTestBLE.py @@ -0,0 +1,85 @@ +# python -m serial.tools.list_ports +# python mainPcTestBLE.py -p + +# In this example, PC will send some messages to robot, +# and verify it receives checksum of these messages from robot +# Note: if message from PC to robot exceeds 18 characters, it will be split in +# several BLE messages, then merged at robot side to get original message + +import sys +import binascii +import time +import argparse +import random +import ComWithDongle + +robotName = 'myTeamName' + +randCharRange = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' + +expectedToReceive = [] + +def onMsgFromRobot(data:str): + """Function to call when a message sent by robot is received + :param data: message""" + print('received msg', data, flush=True) + print('compair to', expectedToReceive, flush=True) + if data in expectedToReceive: + expectedToReceive.remove(data) + print('-not received yet', len(expectedToReceive), expectedToReceive, flush=True) + else: + print('bad message received', data) + print('expected to receive') + for s in expectedToReceive: + print(' ', s) + exit(1) + +parser = argparse.ArgumentParser( + description='Script to communicate with STM32WB55 dongle connected on computer') +parser.add_argument('-p', '--portcom', type=str, help='id of com port used') +parser.add_argument('-d', '--debug', action='store_true', help='display debug messages') +parser.add_argument('-l', '--length', type=int, default=16, + help='number of characters to send over BLE, in each message') +parser.add_argument('-n', '--number', type=int, default=5 , help='number of messages to send over BLE') +parser.add_argument('-b', '--bytes', action='store_true', help='send bytes instead of string') +args = parser.parse_args() + + +try: + print('start main') + # wait BLE connection is established + com = ComWithDongle.ComWithDongle(comPort=args.portcom, peripheralName=robotName, + onMsgReceived=onMsgFromRobot, debug=args.debug) + print('connected to', robotName) + msgId = 0 + while True: + if args.bytes: + data = random.randbytes(args.length) + else: + data = ''.join([random.choice(randCharRange) for _ in range(args.length)]) + print('send data', len(data), msgId, data, flush=True) + checksum = binascii.crc32(data) + expectedToReceive.append(str(checksum)) + com.sendMsg(data) + print('+not received yet', len(expectedToReceive), expectedToReceive, flush=True) + msgId += 1 + if msgId >= args.number: break + #time.sleep(0.01) + time.sleep(0.2) + #all messages sent, wait while we receive some messages + com.sendMsg('END') + nbMissing = len(expectedToReceive) + lastNbMissing = 0 + while not nbMissing == lastNbMissing: + if nbMissing == 0: + print('all messages received') + exit(0) + print('missing', expectedToReceive, flush=True) + lastNbMissing = nbMissing + com.sendMsg('END') + time.sleep(2) + nbMissing = len(expectedToReceive) +except KeyboardInterrupt: + pass +com.disconnect() +exit(0) \ No newline at end of file diff --git a/ble/mainRobotTestBLE.py b/ble/mainRobotTestBLE.py new file mode 100644 index 0000000..2bdff6c --- /dev/null +++ b/ble/mainRobotTestBLE.py @@ -0,0 +1,41 @@ +# to know COM port used when connected on PC: +# python -m serial.tools.list_ports + +# in this example, robot will send back to PC the checksum of each message received + +import binascii +import uasyncio as asyncio +import RobotBleServer + +robotName = 'myTeamName' + +toSend = [] + +def onMsgToRobot(data:str|bytes): + """Function to call when a message sent by PC is received + :param data: message received""" + checksum = binascii.crc32(data) + print('received', data, '=>', checksum) + toSend.append(str(checksum)) + +async def robotMainTask(bleConnection): + """Main function for robot activities + :param bleConnection: object to check BLE connection and send messages""" + while True: + await asyncio.sleep(0.1) + #print('connection', bleConnection.connection) + if not bleConnection.connection: continue + if toSend == []: continue + while not toSend == []: + data = toSend.pop(0) + bleConnection.sendMessage(data) + print('sent', data) + +# Run tasks +async def main(): + print('Start main') + bleConnection = RobotBleServer.RobotBleServer(robotName=robotName, onMsgReceived=onMsgToRobot) + asyncio.create_task(robotMainTask(bleConnection)) + await bleConnection.communicationTask() + +asyncio.run(main()) diff --git a/ble/toDongle.sh b/ble/toDongle.sh new file mode 100644 index 0000000..cc0ff1f --- /dev/null +++ b/ble/toDongle.sh @@ -0,0 +1,11 @@ +#!/usr/bin/bash + +drive=$1 +if [[($drive == "")]]; then + echo missing drive + exit 1 +fi + +cp -r aioble $drive/ +cp mainDongle.py $drive/main.py +sync diff --git a/ble/toRobot.sh b/ble/toRobot.sh new file mode 100644 index 0000000..a6c215e --- /dev/null +++ b/ble/toRobot.sh @@ -0,0 +1,12 @@ +#!/usr/bin/bash + +drive=$1 +if [[($drive == "")]]; then + echo missing drive + exit 1 +fi + +cp -r aioble $drive/ +cp RobotBleServer.py $drive +cp mainRobotTestBLE.py $drive/main.py +sync