MAJ gitlab

This commit is contained in:
Nogard 2026-03-22 00:26:24 +01:00
parent ea5f707d05
commit b6e0498dc3
17 changed files with 2615 additions and 0 deletions

95
ble/ComWithDongle.py Normal file
View File

@ -0,0 +1,95 @@
# python -m serial.tools.list_ports
import sys
import time
#import binascii
import base64
import serial
import threading
import json
ctrlC = bytes.fromhex("03")
ctrlD = bytes.fromhex("04")
class ComWithDongle:
"""Class to manage communication with dongle, over virtual COM port"""
def __init__(self, comPort:str, peripheralName:str, onMsgReceived, debug=False):
""":param comPort: name of COM port used by dongle
:param peripheralName: name of BLE peripheral
:param onMsgReceived: function to call when a message from peripheral is received
:param debug: when True, print debug messages received from dongle"""
try:
self.ser = serial.Serial(port=comPort, baudrate=115200, timeout=2)
except serial.SerialException:
exit(f"no device on port {comPort}")
self.bleConnected = threading.Semaphore(0)
self.messageSent = threading.Semaphore(0)
self.onMsgReceived = onMsgReceived
self.debug = debug
self.resetDongle()
threading.Thread(name='readComPort', target=self.readFromComPort, daemon=True).start()
# first message over COM port to dongle is to define BLE peripheral to connect on
self.sendDict({'type':'connect','name':peripheralName})
timeoutNotReached = self.bleConnected.acquire(timeout=5)
if not timeoutNotReached:
exit(f'unable to connect to peripheral "{peripheralName}"')
def resetDongle(self):
self.ser.write(ctrlC)
self.ser.write(ctrlD)
self.ser.flush()
time.sleep(2)
def sendDict(self, msg:dict):
self.ser.write(json.dumps(msg).encode("utf-8") + b'\r')
def sendMsg(self, msg:str|bytes):
if isinstance(msg, str):
self.sendDict({'type':'msg', 'format':'str', 'string':msg})
else:
#self.sendDict({'type':'msg', 'base64':binascii.b2a_base64(msg).decode("utf-8")})
#b = binascii.b2a_base64(msg).decode("utf-8").rstrip()
#b = base64.b64encode(msg).decode("utf-8").rstrip()
b = base64.b64encode(msg).decode("utf-8").rstrip()
if self.debug: print('sendMsg', msg, '=>', b, flush=True)
self.sendDict({'type':'msg', 'format':'base64', 'string':b})
self.messageSent.acquire(timeout=2)
def disconnect(self):
self.sendDict({'type': 'disconnect'})
def readFromComPort(self):
while True:
line = self.ser.readline().rstrip()
# valid message can't be empty
if type(line) is not bytes or line == b'':
# empty message received after a timeout on serial connection, to ignore
continue
line = line.decode("utf-8")
try:
receivedMsgDict = json.loads(line)
except json.decoder.JSONDecodeError:
# this is not a dictionnary, just a debug message
if self.debug: print('from COM:', line, flush=True)
continue
msgType = receivedMsgDict['type']
if msgType == 'connected':
self.bleConnected.release()
elif msgType == 'sentMessage':
self.messageSent.release()
elif msgType == 'msgFromBle':
if receivedMsgDict['format'] == 'str':
self.onMsgReceived(receivedMsgDict['string'])
else:
if self.debug: print('base64 msg from BLE:', len(receivedMsgDict['string']), receivedMsgDict['string'])
#self.onMsgReceived(binascii.a2b_base64(receivedMsgDict['string']))
self.onMsgReceived(base64.b64decode(receivedMsgDict['string']))
elif msgType == 'debug':
if self.debug:
del(receivedMsgDict['type'])
print('debug COM:', receivedMsgDict)
elif msgType in ['connect', 'msg']:
pass
else:
print('unknown msg type', receivedMsgDict)

48
ble/README.md Normal file
View File

@ -0,0 +1,48 @@
# BLE example
Here is an example of driver to send messages using BLE
# Setup on robot (or other BLE advertiser)
Copy following files to robot
- aioble/\*
- RobotBleServer.py
- mainRobotTestBLE.py (to rename as main.py)
You can use script toRobot.sh for that, for example when run from a Windows git bash,
if robot is connected on drive D:, you can run
> ./toRobot.sh /d
# Setup on USB dongle
Copy following files to robot
- aioble/\*
- mainDongle.py (to rename as main.py)
You can use script toDongle.sh for that, for example when run from a Windows git bash,
if dongle is connected on drive E:, you can run
> ./toDongle.sh /e
# Setup on computer
You need pyserial module for python. You can install it using command
> python -m pip install pyserial
or if a proxy is required
> python -m pip install --proxy \<http proxy parameter\> pyserial
Then run following command
> python mainPcTestBLE.py --portcom \<com port used by dongle\>
To know COM port to use as argument, run following command before and after dongle connection:
> python -m serial.tools.list_ports
Port in second result but not in first result is port used by dongle.
# Connect on the good robot
When several robots are started at same time, they shall have a unique identifier so you can connect over BLE on the good robot.
For that, you shall replace "myTeamName" by a unique identifer (for example the name of your team) in following files:
- mainRobotTestBLE.py
- mainPcTestBLE.py
# Note relative to BLE
The Bluetooth is a connection with a limited transfer rate. If you try to send a lot of messages in a short period of time, or transfer long messages, the BLE driver will do it's best to transfer all data but expect delay to receive messages on the other side.

113
ble/RobotBleServer.py Normal file
View File

@ -0,0 +1,113 @@
# to know COM port used when connected on PC:
# python -m serial.tools.list_ports
import binascii
import sys
sys.path.append("")
from micropython import const
import aioble
import bluetooth
import struct
_SERVICE_UUID = bluetooth.UUID(0x1234)
_CHAR_UUID = bluetooth.UUID(0x1235)
# How frequently to send advertising beacons.
_ADV_INTERVAL_MS = 250_000
MAX_MSG_DATA_LENGTH = const(18)
_COMMAND_DONE = const(0)
_COMMAND_SENDDATA = const(1)
_COMMAND_SENDCHUNK = const(2) # send chunk of string, use _COMMAND_SENDDATA for last chunk
_COMMAND_SENDBYTESDATA = const(3)
_COMMAND_SENDBYTESCHUNK = const(4) # send chunk of string base64 formatted, use _COMMAND_SENDBYTESDATA for last chunk
class RobotBleServer:
"""Class to manage connection with BLE"""
def __init__(self, robotName:str, onMsgReceived):
""":param robotName: name to use in advertising
:param onMsgReceived: function to call when a message is received"""
self.robotName = robotName
self.onMsgReceived = onMsgReceived
# Register GATT server.
service = aioble.Service(_SERVICE_UUID)
self.characteristic = aioble.Characteristic(service, _CHAR_UUID, write=True, notify=True)
aioble.register_services(service)
self.connection = None
def sendMessage(self, msg:str|bytes):
"""Send a message over BLE
Message can be a string or a bytes sequence (maximum 18 charaters/bytes per message)
:param msg: message to send"""
if type(msg) == str:
encodedMsg = msg.encode()
sendMsgType, sendChunkMsgType = _COMMAND_SENDDATA, _COMMAND_SENDCHUNK
elif type(msg) == bytes:
#msg = binascii.b2a_base64(msg).encode()
encodedMsg = binascii.b2a_base64(msg).rstrip()
sendMsgType, sendChunkMsgType = _COMMAND_SENDBYTESDATA, _COMMAND_SENDBYTESCHUNK
else:
raise Exception('unsupported message type', type(msg))
print('encode', type(msg), msg, '=>', encodedMsg)
while len(encodedMsg) > MAX_MSG_DATA_LENGTH:
chunk = encodedMsg[:MAX_MSG_DATA_LENGTH]
self.characteristic.notify(self.connection, struct.pack("<B", sendChunkMsgType) + chunk)
encodedMsg = encodedMsg[MAX_MSG_DATA_LENGTH:]
print('sent chunk', chunk)
self.characteristic.notify(self.connection, struct.pack("<B", sendMsgType) + encodedMsg)
print('sent last', encodedMsg)
async def bleTask(self):
"""Loop to wait for incoming messages over BLE.
When a received message is complete, call function defined in self.onMsgReceived
When BLE connection is closed, stop this function"""
try:
with self.connection.timeout(None):
dataChunk = ''
msgId = 0
while True:
await self.characteristic.written()
msg = self.characteristic.read()
#self.characteristic.write(b"")
if len(msg) < 3:
continue
# Message is <command><seq><data>.
command = msg[0]
op_seq = int(msg[1])
msgData = msg[2:].decode()
#print('MSG=', msg)
if command in (_COMMAND_SENDCHUNK, _COMMAND_SENDBYTESCHUNK):
dataChunk += msgData
print('received chunk', msgData, '=>', dataChunk)
elif command in (_COMMAND_SENDDATA, _COMMAND_SENDBYTESDATA):
data = dataChunk + msgData
dataChunk = ''
if command == _COMMAND_SENDBYTESDATA:
data = binascii.a2b_base64(data)
#print('received data:', data)
print('received:', len(data), msgId, type(data), data)
self.onMsgReceived(data)
msgId += 1
except aioble.DeviceDisconnectedError:
print('disconnected BLE')
return
async def communicationTask(self):
"""Loop to advertise and wait for connection.
When connection is established, start task to read incoming messages"""
while True:
print("Waiting for connection")
self.connection = await aioble.advertise(
_ADV_INTERVAL_MS,
name=self.robotName,
services=[_SERVICE_UUID],
)
print("Connection from", self.connection.device)
await self.bleTask()
await self.connection.disconnected()
self.connection = None

32
ble/aioble/__init__.py Normal file
View File

@ -0,0 +1,32 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
from .device import Device, DeviceDisconnectedError
from .core import log_info, log_warn, log_error, GattError, config, stop
try:
from .peripheral import advertise
except:
log_info("Peripheral support disabled")
try:
from .central import scan
except:
log_info("Central support disabled")
try:
from .server import (
Service,
Characteristic,
BufferedCharacteristic,
Descriptor,
register_services,
)
except:
log_info("GATT server support disabled")
ADDR_PUBLIC = const(0)
ADDR_RANDOM = const(1)

297
ble/aioble/central.py Normal file
View File

@ -0,0 +1,297 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import bluetooth
import struct
import uasyncio as asyncio
from .core import (
ensure_active,
ble,
log_info,
log_error,
log_warn,
register_irq_handler,
)
from .device import Device, DeviceConnection, DeviceTimeout
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_ADV_IND = const(0)
_ADV_DIRECT_IND = const(1)
_ADV_SCAN_IND = const(2)
_ADV_NONCONN_IND = const(3)
_SCAN_RSP = const(4)
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_SHORT_NAME = const(0x08)
_ADV_TYPE_UUID16_INCOMPLETE = const(0x2)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_INCOMPLETE = const(0x4)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_INCOMPLETE = const(0x6)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_APPEARANCE = const(0x19)
_ADV_TYPE_MANUFACTURER = const(0xFF)
# Keep track of the active scanner so IRQs can be delivered to it.
_active_scanner = None
# Set of devices that are waiting for the peripheral connect IRQ.
_connecting = set()
def _central_irq(event, data):
# Send results and done events to the active scanner instance.
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
if not _active_scanner:
return
_active_scanner._queue.append((addr_type, bytes(addr), adv_type, rssi, bytes(adv_data)))
_active_scanner._event.set()
elif event == _IRQ_SCAN_DONE:
if not _active_scanner:
return
_active_scanner._done = True
_active_scanner._event.set()
# Peripheral connect must be in response to a pending connection, so find
# it in the pending connection set.
elif event == _IRQ_PERIPHERAL_CONNECT:
conn_handle, addr_type, addr = data
for d in _connecting:
if d.addr_type == addr_type and d.addr == addr:
# Allow connect() to complete.
connection = d._connection
connection._conn_handle = conn_handle
connection._event.set()
break
# Find the active device connection for this connection handle.
elif event == _IRQ_PERIPHERAL_DISCONNECT:
conn_handle, _, _ = data
if connection := DeviceConnection._connected.get(conn_handle, None):
# Tell the device_task that it should terminate.
connection._event.set()
def _central_shutdown():
global _active_scanner, _connecting
_active_scanner = None
_connecting = set()
register_irq_handler(_central_irq, _central_shutdown)
# Cancel an in-progress scan.
async def _cancel_pending():
if _active_scanner:
await _active_scanner.cancel()
# Start connecting to a peripheral.
# Call device.connect() rather than using method directly.
async def _connect(connection, timeout_ms):
device = connection.device
if device in _connecting:
return
# Enable BLE and cancel in-progress scans.
ensure_active()
await _cancel_pending()
# Allow the connected IRQ to find the device by address.
_connecting.add(device)
# Event will be set in the connected IRQ, and then later
# re-used to notify disconnection.
connection._event = connection._event or asyncio.ThreadSafeFlag()
try:
with DeviceTimeout(None, timeout_ms):
ble.gap_connect(device.addr_type, device.addr)
# Wait for the connected IRQ.
await connection._event.wait()
assert connection._conn_handle is not None
# Register connection handle -> device.
DeviceConnection._connected[connection._conn_handle] = connection
finally:
# After timeout, don't hold a reference and ignore future events.
_connecting.remove(device)
# Represents a single device that has been found during a scan. The scan
# iterator will return the same ScanResult instance multiple times as its data
# changes (i.e. changing RSSI or advertising data).
class ScanResult:
def __init__(self, device):
self.device = device
self.adv_data = None
self.resp_data = None
self.rssi = None
self.connectable = False
# New scan result available, return true if it changes our state.
def _update(self, adv_type, rssi, adv_data):
updated = False
if rssi != self.rssi:
self.rssi = rssi
updated = True
if adv_type in (_ADV_IND, _ADV_NONCONN_IND):
if adv_data != self.adv_data:
self.adv_data = adv_data
self.connectable = adv_type == _ADV_IND
updated = True
elif adv_type == _ADV_SCAN_IND:
if adv_data != self.adv_data and self.resp_data:
updated = True
self.adv_data = adv_data
elif adv_type == _SCAN_RSP and adv_data:
if adv_data != self.resp_data:
self.resp_data = adv_data
updated = True
return updated
def __str__(self):
return "Scan result: {} {}".format(self.device, self.rssi)
# Gets all the fields for the specified types.
def _decode_field(self, *adv_type):
# Advertising payloads are repeated packets of the following form:
# 1 byte data length (N + 1)
# 1 byte type (see constants below)
# N bytes type-specific data
for payload in (self.adv_data, self.resp_data):
if not payload:
continue
i = 0
while i + 1 < len(payload):
if payload[i + 1] in adv_type:
yield payload[i + 2 : i + payload[i] + 1]
i += 1 + payload[i]
# Returns the value of the complete (or shortened) advertised name, if available.
def name(self):
for n in self._decode_field(_ADV_TYPE_NAME, _ADV_TYPE_SHORT_NAME):
return str(n, "utf-8") if n else ""
# Generator that enumerates the service UUIDs that are advertised.
def services(self):
for u in self._decode_field(_ADV_TYPE_UUID16_INCOMPLETE, _ADV_TYPE_UUID16_COMPLETE):
yield bluetooth.UUID(struct.unpack("<H", u)[0])
for u in self._decode_field(_ADV_TYPE_UUID32_INCOMPLETE, _ADV_TYPE_UUID32_COMPLETE):
yield bluetooth.UUID(struct.unpack("<I", u)[0])
for u in self._decode_field(_ADV_TYPE_UUID128_INCOMPLETE, _ADV_TYPE_UUID128_COMPLETE):
yield bluetooth.UUID(u)
# Generator that returns (manufacturer_id, data) tuples.
def manufacturer(self, filter=None):
for u in self._decode_field(_ADV_TYPE_MANUFACTURER):
if len(u) < 2:
continue
m = struct.unpack("<H", u[0:2])[0]
if filter is None or m == filter:
yield (m, u[2:])
# Use with:
# async with aioble.scan(...) as scanner:
# async for result in scanner:
# ...
class scan:
def __init__(self, duration_ms, interval_us=None, window_us=None, active=False):
self._queue = []
self._event = asyncio.ThreadSafeFlag()
self._done = False
# Keep track of what we've already seen.
self._results = set()
# Ideally we'd start the scan here and avoid having to save these
# values, but we need to stop any previous scan first via awaiting
# _cancel_pending(), but __init__ isn't async.
self._duration_ms = duration_ms
self._interval_us = interval_us or 1280000
self._window_us = window_us or 11250
self._active = active
async def __aenter__(self):
global _active_scanner
ensure_active()
await _cancel_pending()
_active_scanner = self
ble.gap_scan(self._duration_ms, self._interval_us, self._window_us, self._active)
return self
async def __aexit__(self, exc_type, exc_val, exc_traceback):
# Cancel the current scan if we're still the active scanner. This will
# happen if the loop breaks early before the scan duration completes.
if _active_scanner == self:
await self.cancel()
def __aiter__(self):
assert _active_scanner == self
return self
async def __anext__(self):
global _active_scanner
if _active_scanner != self:
# The scan has been canceled (e.g. a connection was initiated).
raise StopAsyncIteration
while True:
while self._queue:
addr_type, addr, adv_type, rssi, adv_data = self._queue.pop()
# Try to find an existing ScanResult for this device.
for r in self._results:
if r.device.addr_type == addr_type and r.device.addr == addr:
result = r
break
else:
# New device, create a new Device & ScanResult.
device = Device(addr_type, addr)
result = ScanResult(device)
self._results.add(result)
# Add the new information from this event.
if result._update(adv_type, rssi, adv_data):
# It's new information, so re-yield this result.
return result
if self._done:
# _IRQ_SCAN_DONE event was fired.
_active_scanner = None
raise StopAsyncIteration
# Wait for either done or result IRQ.
await self._event.wait()
# Cancel any in-progress scan. We need to do this before starting any other operation.
async def cancel(self):
if self._done:
return
ble.gap_scan(None)
while not self._done:
await self._event.wait()
global _active_scanner
_active_scanner = None

456
ble/aioble/client.py Normal file
View File

@ -0,0 +1,456 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
from collections import deque
import uasyncio as asyncio
import struct
import bluetooth
from .core import ble, GattError, register_irq_handler
from .device import DeviceConnection
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_CCCD_UUID = const(0x2902)
_CCCD_NOTIFY = const(1)
_CCCD_INDICATE = const(2)
_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
# Forward IRQs directly to static methods on the type that handles them and
# knows how to map handles to instances. Note: We copy all uuid and data
# params here for safety, but a future optimisation might be able to avoid
# these copies in a few places.
def _client_irq(event, data):
if event == _IRQ_GATTC_SERVICE_RESULT:
conn_handle, start_handle, end_handle, uuid = data
ClientDiscover._discover_result(
conn_handle, start_handle, end_handle, bluetooth.UUID(uuid)
)
elif event == _IRQ_GATTC_SERVICE_DONE:
conn_handle, status = data
ClientDiscover._discover_done(conn_handle, status)
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
conn_handle, end_handle, value_handle, properties, uuid = data
ClientDiscover._discover_result(
conn_handle, end_handle, value_handle, properties, bluetooth.UUID(uuid)
)
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
conn_handle, status = data
ClientDiscover._discover_done(conn_handle, status)
elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
conn_handle, dsc_handle, uuid = data
ClientDiscover._discover_result(conn_handle, dsc_handle, bluetooth.UUID(uuid))
elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
conn_handle, status = data
ClientDiscover._discover_done(conn_handle, status)
elif event == _IRQ_GATTC_READ_RESULT:
conn_handle, value_handle, char_data = data
ClientCharacteristic._read_result(conn_handle, value_handle, bytes(char_data))
elif event == _IRQ_GATTC_READ_DONE:
conn_handle, value_handle, status = data
ClientCharacteristic._read_done(conn_handle, value_handle, status)
elif event == _IRQ_GATTC_WRITE_DONE:
conn_handle, value_handle, status = data
ClientCharacteristic._write_done(conn_handle, value_handle, status)
elif event == _IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
ClientCharacteristic._on_notify(conn_handle, value_handle, bytes(notify_data))
elif event == _IRQ_GATTC_INDICATE:
conn_handle, value_handle, indicate_data = data
ClientCharacteristic._on_indicate(conn_handle, value_handle, bytes(indicate_data))
register_irq_handler(_client_irq, None)
# Async generator for discovering services, characteristics, descriptors.
class ClientDiscover:
def __init__(self, connection, disc_type, parent, timeout_ms, *args):
self._connection = connection
# Each result IRQ will append to this.
self._queue = []
# This will be set by the done IRQ.
self._status = None
# Tell the generator to process new events.
self._event = asyncio.ThreadSafeFlag()
# Must implement the _start_discovery static method. Instances of this
# type are returned by __anext__.
self._disc_type = disc_type
# This will be the connection for a service discovery, and the service for a characteristic discovery.
self._parent = parent
# Timeout for the discovery process.
# TODO: Not implemented.
self._timeout_ms = timeout_ms
# Additional arguments to pass to the _start_discovery method on disc_type.
self._args = args
async def _start(self):
if self._connection._discover:
# TODO: cancel existing? (e.g. perhaps they didn't let the loop run to completion)
raise ValueError("Discovery in progress")
# Tell the connection that we're the active discovery operation (the IRQ only gives us conn_handle).
self._connection._discover = self
# Call the appropriate ubluetooth.BLE method.
self._disc_type._start_discovery(self._parent, *self._args)
def __aiter__(self):
return self
async def __anext__(self):
if self._connection._discover != self:
# Start the discovery if necessary.
await self._start()
# Keep returning items from the queue until the status is set by the
# done IRQ.
while True:
while self._queue:
return self._disc_type(self._parent, *self._queue.pop())
if self._status is not None:
self._connection._discover = None
raise StopAsyncIteration
# Wait for more results to be added to the queue.
await self._event.wait()
# Tell the active discovery instance for this connection to add a new result
# to the queue.
def _discover_result(conn_handle, *args):
if connection := DeviceConnection._connected.get(conn_handle, None):
if discover := connection._discover:
discover._queue.append(args)
discover._event.set()
# Tell the active discovery instance for this connection that it is complete.
def _discover_done(conn_handle, status):
if connection := DeviceConnection._connected.get(conn_handle, None):
if discover := connection._discover:
discover._status = status
discover._event.set()
# Represents a single service supported by a connection. Do not construct this
# class directly, instead use `async for service in connection.services([uuid])` or
# `await connection.service(uuid)`.
class ClientService:
def __init__(self, connection, start_handle, end_handle, uuid):
self.connection = connection
# Used for characteristic discovery.
self._start_handle = start_handle
self._end_handle = end_handle
# Allows comparison to a known uuid.
self.uuid = uuid
def __str__(self):
return "Service: {} {} {}".format(self._start_handle, self._end_handle, self.uuid)
# Search for a specific characteristic by uuid.
async def characteristic(self, uuid, timeout_ms=2000):
result = None
# Make sure loop runs to completion.
async for characteristic in self.characteristics(uuid, timeout_ms):
if not result and characteristic.uuid == uuid:
# Keep first result.
result = characteristic
return result
# Search for all services (optionally by uuid).
# Use with `async for`, e.g.
# async for characteristic in service.characteristics():
# Note: must allow the loop to run to completion.
def characteristics(self, uuid=None, timeout_ms=2000):
return ClientDiscover(self.connection, ClientCharacteristic, self, timeout_ms, uuid)
# For ClientDiscover
def _start_discovery(connection, uuid=None):
ble.gattc_discover_services(connection._conn_handle, uuid)
class BaseClientCharacteristic:
def __init__(self, value_handle, properties, uuid):
# Used for read/write/notify ops.
self._value_handle = value_handle
# Which operations are supported.
self.properties = properties
# Allows comparison to a known uuid.
self.uuid = uuid
if properties & _FLAG_READ:
# Fired for each read result and read done IRQ.
self._read_event = None
self._read_data = None
# Used to indicate that the read is complete.
self._read_status = None
if (properties & _FLAG_WRITE) or (properties & _FLAG_WRITE_NO_RESPONSE):
# Fired for the write done IRQ.
self._write_event = None
# Used to indicate that the write is complete.
self._write_status = None
# Register this value handle so events can find us.
def _register_with_connection(self):
self._connection()._characteristics[self._value_handle] = self
# Map an incoming IRQ to an registered characteristic.
def _find(conn_handle, value_handle):
if connection := DeviceConnection._connected.get(conn_handle, None):
if characteristic := connection._characteristics.get(value_handle, None):
return characteristic
else:
# IRQ for a characteristic that we weren't expecting. e.g.
# notification when we're not waiting on notified().
# TODO: This will happen on btstack, which doesn't give us
# value handle for the done event.
return None
def _check(self, flag):
if not (self.properties & flag):
raise ValueError("Unsupported")
# Issue a read to the characteristic.
async def read(self, timeout_ms=1000):
self._check(_FLAG_READ)
# Make sure this conn_handle/value_handle is known.
self._register_with_connection()
# This will be set by the done IRQ.
self._read_status = None
# This will be set by the result and done IRQs. Re-use if possible.
self._read_event = self._read_event or asyncio.ThreadSafeFlag()
# Issue the read.
ble.gattc_read(self._connection()._conn_handle, self._value_handle)
with self._connection().timeout(timeout_ms):
# The event will be set for each read result, then a final time for done.
while self._read_status is None:
await self._read_event.wait()
if self._read_status != 0:
raise GattError(self._read_status)
return self._read_data
# Map an incoming result IRQ to a registered characteristic.
def _read_result(conn_handle, value_handle, data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._read_data = data
characteristic._read_event.set()
# Map an incoming read done IRQ to a registered characteristic.
def _read_done(conn_handle, value_handle, status):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._read_status = status
characteristic._read_event.set()
async def write(self, data, response=None, timeout_ms=1000):
self._check(_FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE)
# If the response arg is unset, then default it to true if we only support write-with-response.
if response is None:
p = self.properties
response = (p & _FLAG_WRITE) and not (p & _FLAG_WRITE_NO_RESPONSE)
if response:
# Same as read.
self._register_with_connection()
self._write_status = None
self._write_event = self._write_event or asyncio.ThreadSafeFlag()
# Issue the write.
ble.gattc_write(self._connection()._conn_handle, self._value_handle, data, response)
if response:
with self._connection().timeout(timeout_ms):
# The event will be set for the write done IRQ.
await self._write_event.wait()
if self._write_status != 0:
raise GattError(self._write_status)
# Map an incoming write done IRQ to a registered characteristic.
def _write_done(conn_handle, value_handle, status):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._write_status = status
characteristic._write_event.set()
# Represents a single characteristic supported by a service. Do not construct
# this class directly, instead use `async for characteristic in
# service.characteristics([uuid])` or `await service.characteristic(uuid)`.
class ClientCharacteristic(BaseClientCharacteristic):
def __init__(self, service, end_handle, value_handle, properties, uuid):
self.service = service
self.connection = service.connection
# Used for descriptor discovery. If available, otherwise assume just
# past the value handle (enough for two descriptors without risking
# going into the next characteristic).
self._end_handle = end_handle if end_handle > value_handle else value_handle + 2
super().__init__(value_handle, properties, uuid)
if properties & _FLAG_NOTIFY:
# Fired when a notification arrives.
self._notify_event = asyncio.ThreadSafeFlag()
# Data for the most recent notification.
self._notify_queue = deque((), 1)
if properties & _FLAG_INDICATE:
# Same for indications.
self._indicate_event = asyncio.ThreadSafeFlag()
self._indicate_queue = deque((), 1)
def __str__(self):
return "Characteristic: {} {} {} {}".format(
self._end_handle, self._value_handle, self.properties, self.uuid
)
def _connection(self):
return self.service.connection
# Search for a specific descriptor by uuid.
async def descriptor(self, uuid, timeout_ms=2000):
result = None
# Make sure loop runs to completion.
async for descriptor in self.descriptors(timeout_ms):
if not result and descriptor.uuid == uuid:
# Keep first result.
result = descriptor
return result
# Search for all services (optionally by uuid).
# Use with `async for`, e.g.
# async for descriptor in characteristic.descriptors():
# Note: must allow the loop to run to completion.
def descriptors(self, timeout_ms=2000):
return ClientDiscover(self.connection, ClientDescriptor, self, timeout_ms)
# For ClientDiscover
def _start_discovery(service, uuid=None):
ble.gattc_discover_characteristics(
service.connection._conn_handle,
service._start_handle,
service._end_handle,
uuid,
)
# Helper for notified() and indicated().
async def _notified_indicated(self, queue, event, timeout_ms):
# Ensure that events for this connection can route to this characteristic.
self._register_with_connection()
# If the queue is empty, then we need to wait. However, if the queue
# has a single item, we also need to do a no-op wait in order to
# clear the event flag (because the queue will become empty and
# therefore the event should be cleared).
if len(queue) <= 1:
with self._connection().timeout(timeout_ms):
await event.wait()
# Either we started > 1 item, or the wait completed successfully, return
# the front of the queue.
return queue.popleft()
# Wait for the next notification.
# Will return immediately if a notification has already been received.
async def notified(self, timeout_ms=None):
self._check(_FLAG_NOTIFY)
return await self._notified_indicated(self._notify_queue, self._notify_event, timeout_ms)
def _on_notify_indicate(self, queue, event, data):
# If we've gone from empty to one item, then wake something
# blocking on `await char.notified()` (or `await char.indicated()`).
wake = len(queue) == 0
# Append the data. By default this is a deque with max-length==1, so it
# replaces. But if capture is enabled then it will append.
queue.append(data)
if wake:
# Queue is now non-empty. If something is waiting, it will be
# worken. If something isn't waiting right now, then a future
# caller to `await char.written()` will see the queue is
# non-empty, and wait on the event if it's going to empty the
# queue.
event.set()
# Map an incoming notify IRQ to a registered characteristic.
def _on_notify(conn_handle, value_handle, notify_data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._on_notify_indicate(
characteristic._notify_queue, characteristic._notify_event, notify_data
)
# Wait for the next indication.
# Will return immediately if an indication has already been received.
async def indicated(self, timeout_ms=None):
self._check(_FLAG_INDICATE)
return await self._notified_indicated(
self._indicate_queue, self._indicate_event, timeout_ms
)
# Map an incoming indicate IRQ to a registered characteristic.
def _on_indicate(conn_handle, value_handle, indicate_data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._on_notify_indicate(
characteristic._indicate_queue, characteristic._indicate_event, indicate_data
)
# Write to the Client Characteristic Configuration to subscribe to
# notify/indications for this characteristic.
async def subscribe(self, notify=True, indicate=False):
# Ensure that the generated notifications are dispatched in case the app
# hasn't awaited on notified/indicated yet.
self._register_with_connection()
if cccd := await self.descriptor(bluetooth.UUID(_CCCD_UUID)):
await cccd.write(struct.pack("<H", _CCCD_NOTIFY * notify + _CCCD_INDICATE * indicate))
else:
raise ValueError("CCCD not found")
# Represents a single descriptor supported by a characteristic. Do not construct
# this class directly, instead use `async for descriptors in
# characteristic.descriptors([uuid])` or `await characteristic.descriptor(uuid)`.
class ClientDescriptor(BaseClientCharacteristic):
def __init__(self, characteristic, dsc_handle, uuid):
self.characteristic = characteristic
super().__init__(dsc_handle, _FLAG_READ | _FLAG_WRITE_NO_RESPONSE, uuid)
def __str__(self):
return "Descriptor: {} {} {}".format(self._value_handle, self.properties, self.uuid)
def _connection(self):
return self.characteristic.service.connection
# For ClientDiscover
def _start_discovery(characteristic, uuid=None):
ble.gattc_discover_descriptors(
characteristic._connection()._conn_handle,
characteristic._value_handle,
characteristic._end_handle,
)

78
ble/aioble/core.py Normal file
View File

@ -0,0 +1,78 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
import bluetooth
log_level = 1
def log_error(*args):
if log_level > 0:
print("[aioble] E:", *args)
def log_warn(*args):
if log_level > 1:
print("[aioble] W:", *args)
def log_info(*args):
if log_level > 2:
print("[aioble] I:", *args)
class GattError(Exception):
def __init__(self, status):
self._status = status
def ensure_active():
if not ble.active():
try:
from .security import load_secrets
load_secrets()
except:
pass
ble.active(True)
def config(*args, **kwargs):
ensure_active()
return ble.config(*args, **kwargs)
# Because different functionality is enabled by which files are available the
# different modules can register their IRQ handlers and shutdown handlers
# dynamically.
_irq_handlers = []
_shutdown_handlers = []
def register_irq_handler(irq, shutdown):
if irq:
_irq_handlers.append(irq)
if shutdown:
_shutdown_handlers.append(shutdown)
def stop():
ble.active(False)
for handler in _shutdown_handlers:
handler()
# Dispatch IRQs to the registered sub-modules.
def ble_irq(event, data):
log_info(event, data)
for handler in _irq_handlers:
result = handler(event, data)
if result is not None:
return result
# TODO: Allow this to be injected.
ble = bluetooth.BLE()
ble.irq(ble_irq)

295
ble/aioble/device.py Normal file
View File

@ -0,0 +1,295 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import uasyncio as asyncio
import binascii
from .core import ble, register_irq_handler, log_error
_IRQ_MTU_EXCHANGED = const(21)
# Raised by `with device.timeout()`.
class DeviceDisconnectedError(Exception):
pass
def _device_irq(event, data):
if event == _IRQ_MTU_EXCHANGED:
conn_handle, mtu = data
if device := DeviceConnection._connected.get(conn_handle, None):
device.mtu = mtu
if device._mtu_event:
device._mtu_event.set()
register_irq_handler(_device_irq, None)
# Context manager to allow an operation to be cancelled by timeout or device
# disconnection. Don't use this directly -- use `with connection.timeout(ms):`
# instead.
class DeviceTimeout:
def __init__(self, connection, timeout_ms):
self._connection = connection
self._timeout_ms = timeout_ms
# We allow either (or both) connection and timeout_ms to be None. This
# allows this to be used either as a just-disconnect, just-timeout, or
# no-op.
# This task is active while the operation is in progress. It sleeps
# until the timeout, and then cancels the working task. If the working
# task completes, __exit__ will cancel the sleep.
self._timeout_task = None
# This is the task waiting for the actual operation to complete.
# Usually this is waiting on an event that will be set() by an IRQ
# handler.
self._task = asyncio.current_task()
# Tell the connection that if it disconnects, it should cancel this
# operation (by cancelling self._task).
if connection:
connection._timeouts.append(self)
async def _timeout_sleep(self):
try:
await asyncio.sleep_ms(self._timeout_ms)
except asyncio.CancelledError:
# The operation completed successfully and this timeout task was
# cancelled by __exit__.
return
# The sleep completed, so we should trigger the timeout. Set
# self._timeout_task to None so that we can tell the difference
# between a disconnect and a timeout in __exit__.
self._timeout_task = None
self._task.cancel()
def __enter__(self):
if self._timeout_ms:
# Schedule the timeout waiter.
self._timeout_task = asyncio.create_task(self._timeout_sleep())
def __exit__(self, exc_type, exc_val, exc_traceback):
# One of five things happened:
# 1 - The operation completed successfully.
# 2 - The operation timed out.
# 3 - The device disconnected.
# 4 - The operation failed for a different exception.
# 5 - The task was cancelled by something else.
# Don't need the connection to tell us about disconnection anymore.
if self._connection:
self._connection._timeouts.remove(self)
try:
if exc_type == asyncio.CancelledError:
# Case 2, we started a timeout and it's completed.
if self._timeout_ms and self._timeout_task is None:
raise asyncio.TimeoutError
# Case 3, we have a disconnected device.
if self._connection and self._connection._conn_handle is None:
raise DeviceDisconnectedError
# Case 5, something else cancelled us.
# Allow the cancellation to propagate.
return
# Case 1 & 4. Either way, just stop the timeout task and let the
# exception (if case 4) propagate.
finally:
# In all cases, if the timeout is still running, cancel it.
if self._timeout_task:
self._timeout_task.cancel()
class Device:
def __init__(self, addr_type, addr):
# Public properties
self.addr_type = addr_type
self.addr = addr if len(addr) == 6 else binascii.unhexlify(addr.replace(":", ""))
self._connection = None
def __eq__(self, rhs):
return self.addr_type == rhs.addr_type and self.addr == rhs.addr
def __hash__(self):
return hash((self.addr_type, self.addr))
def __str__(self):
return "Device({}, {}{})".format(
"ADDR_PUBLIC" if self.addr_type == 0 else "ADDR_RANDOM",
self.addr_hex(),
", CONNECTED" if self._connection else "",
)
def addr_hex(self):
return binascii.hexlify(self.addr, ":").decode()
async def connect(self, timeout_ms=10000):
if self._connection:
return self._connection
# Forward to implementation in central.py.
from .central import _connect
await _connect(DeviceConnection(self), timeout_ms)
# Start the device task that will clean up after disconnection.
self._connection._run_task()
return self._connection
class DeviceConnection:
# Global map of connection handle to active devices (for IRQ mapping).
_connected = {}
def __init__(self, device):
self.device = device
device._connection = self
self.encrypted = False
self.authenticated = False
self.bonded = False
self.key_size = False
self.mtu = None
self._conn_handle = None
# This event is fired by the IRQ both for connection and disconnection
# and controls the device_task.
self._event = None
# If we're waiting for a pending MTU exchange.
self._mtu_event = None
# In-progress client discovery instance (e.g. services, chars,
# descriptors) used for IRQ mapping.
self._discover = None
# Map of value handle to characteristic (so that IRQs with
# conn_handle,value_handle can route to them). See
# ClientCharacteristic._find for where this is used.
self._characteristics = {}
self._task = None
# DeviceTimeout instances that are currently waiting on this device
# and need to be notified if disconnection occurs.
self._timeouts = []
# Fired by the encryption update event.
self._pair_event = None
# Active L2CAP channel for this device.
# TODO: Support more than one concurrent channel.
self._l2cap_channel = None
# While connected, this tasks waits for disconnection then cleans up.
async def device_task(self):
assert self._conn_handle is not None
# Wait for the (either central or peripheral) disconnected irq.
await self._event.wait()
# Mark the device as disconnected.
del DeviceConnection._connected[self._conn_handle]
self._conn_handle = None
self.device._connection = None
# Cancel any in-progress operations on this device.
for t in self._timeouts:
t._task.cancel()
def _run_task(self):
# Event will be already created this if we initiated connection.
self._event = self._event or asyncio.ThreadSafeFlag()
self._task = asyncio.create_task(self.device_task())
async def disconnect(self, timeout_ms=2000):
await self.disconnected(timeout_ms, disconnect=True)
async def disconnected(self, timeout_ms=60000, disconnect=False):
if not self.is_connected():
return
# The task must have been created after successful connection.
assert self._task
if disconnect:
try:
ble.gap_disconnect(self._conn_handle)
except OSError as e:
log_error("Disconnect", e)
with DeviceTimeout(None, timeout_ms):
await self._task
# Retrieve a single service matching this uuid.
async def service(self, uuid, timeout_ms=2000):
result = None
# Make sure loop runs to completion.
async for service in self.services(uuid, timeout_ms):
if not result and service.uuid == uuid:
result = service
return result
# Search for all services (optionally by uuid).
# Use with `async for`, e.g.
# async for service in device.services():
# Note: must allow the loop to run to completion.
# TODO: disconnection / timeout
def services(self, uuid=None, timeout_ms=2000):
from .client import ClientDiscover, ClientService
return ClientDiscover(self, ClientService, self, timeout_ms, uuid)
async def pair(self, *args, **kwargs):
from .security import pair
await pair(self, *args, **kwargs)
def is_connected(self):
return self._conn_handle is not None
# Use with `with` to simplify disconnection and timeout handling.
def timeout(self, timeout_ms):
return DeviceTimeout(self, timeout_ms)
async def exchange_mtu(self, mtu=None, timeout_ms=1000):
if not self.is_connected():
raise ValueError("Not connected")
if mtu:
ble.config(mtu=mtu)
self._mtu_event = self._mtu_event or asyncio.ThreadSafeFlag()
ble.gattc_exchange_mtu(self._conn_handle)
with self.timeout(timeout_ms):
await self._mtu_event.wait()
return self.mtu
# Wait for a connection on an L2CAP connection-oriented-channel.
async def l2cap_accept(self, psm, mtu, timeout_ms=None):
from .l2cap import accept
return await accept(self, psm, mtu, timeout_ms)
# Attempt to connect to a listening device.
async def l2cap_connect(self, psm, mtu, timeout_ms=1000):
from .l2cap import connect
return await connect(self, psm, mtu, timeout_ms)
# Context manager -- automatically disconnect.
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_traceback):
await self.disconnect()

214
ble/aioble/l2cap.py Normal file
View File

@ -0,0 +1,214 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import uasyncio as asyncio
from .core import ble, log_error, register_irq_handler
from .device import DeviceConnection
_IRQ_L2CAP_ACCEPT = const(22)
_IRQ_L2CAP_CONNECT = const(23)
_IRQ_L2CAP_DISCONNECT = const(24)
_IRQ_L2CAP_RECV = const(25)
_IRQ_L2CAP_SEND_READY = const(26)
# Once we start listening we're listening forever. (Limitation in NimBLE)
_listening = False
def _l2cap_irq(event, data):
if event not in (
_IRQ_L2CAP_CONNECT,
_IRQ_L2CAP_DISCONNECT,
_IRQ_L2CAP_RECV,
_IRQ_L2CAP_SEND_READY,
):
return
# All the L2CAP events start with (conn_handle, cid, ...)
if connection := DeviceConnection._connected.get(data[0], None):
if channel := connection._l2cap_channel:
# Expect to match the cid for this conn handle (unless we're
# waiting for connection in which case channel._cid is None).
if channel._cid is not None and channel._cid != data[1]:
return
# Update the channel object with new information.
if event == _IRQ_L2CAP_CONNECT:
_, channel._cid, _, channel.our_mtu, channel.peer_mtu = data
elif event == _IRQ_L2CAP_DISCONNECT:
_, _, psm, status = data
channel._status = status
channel._cid = None
connection._l2cap_channel = None
elif event == _IRQ_L2CAP_RECV:
channel._data_ready = True
elif event == _IRQ_L2CAP_SEND_READY:
channel._stalled = False
# Notify channel.
channel._event.set()
def _l2cap_shutdown():
global _listening
_listening = False
register_irq_handler(_l2cap_irq, _l2cap_shutdown)
# The channel was disconnected during a send/recvinto/flush.
class L2CAPDisconnectedError(Exception):
pass
# Failed to connect to connection (argument is status).
class L2CAPConnectionError(Exception):
pass
class L2CAPChannel:
def __init__(self, connection):
if not connection.is_connected():
raise ValueError("Not connected")
if connection._l2cap_channel:
raise ValueError("Already has channel")
connection._l2cap_channel = self
self._connection = connection
# Maximum size that the other side can send to us.
self.our_mtu = 0
# Maximum size that we can send.
self.peer_mtu = 0
# Set back to None on disconnection.
self._cid = None
# Set during disconnection.
self._status = 0
# If true, must wait for _IRQ_L2CAP_SEND_READY IRQ before sending.
self._stalled = False
# Has received a _IRQ_L2CAP_RECV since the buffer was last emptied.
self._data_ready = False
self._event = asyncio.ThreadSafeFlag()
def _assert_connected(self):
if self._cid is None:
raise L2CAPDisconnectedError
async def recvinto(self, buf, timeout_ms=None):
self._assert_connected()
# Wait until the data_ready flag is set. This flag is only ever set by
# the event and cleared by this function.
with self._connection.timeout(timeout_ms):
while not self._data_ready:
await self._event.wait()
self._assert_connected()
self._assert_connected()
# Extract up to len(buf) bytes from the channel buffer.
n = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, buf)
# Check if there's still remaining data in the channel buffers.
self._data_ready = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, None) > 0
return n
# Synchronously see if there's data ready.
def available(self):
self._assert_connected()
return self._data_ready
# Waits until the channel is free and then sends buf.
# If the buffer is larger than the MTU it will be sent in chunks.
async def send(self, buf, timeout_ms=None, chunk_size=None):
self._assert_connected()
offset = 0
chunk_size = min(self.our_mtu * 2, self.peer_mtu, chunk_size or self.peer_mtu)
mv = memoryview(buf)
while offset < len(buf):
if self._stalled:
await self.flush(timeout_ms)
# l2cap_send returns True if you can send immediately.
self._stalled = not ble.l2cap_send(
self._connection._conn_handle,
self._cid,
mv[offset : offset + chunk_size],
)
offset += chunk_size
async def flush(self, timeout_ms=None):
self._assert_connected()
# Wait for the _stalled flag to be cleared by the IRQ.
with self._connection.timeout(timeout_ms):
while self._stalled:
await self._event.wait()
self._assert_connected()
async def disconnect(self, timeout_ms=1000):
if self._cid is None:
return
# Wait for the cid to be cleared by the disconnect IRQ.
ble.l2cap_disconnect(self._connection._conn_handle, self._cid)
await self.disconnected(timeout_ms)
async def disconnected(self, timeout_ms=1000):
with self._connection.timeout(timeout_ms):
while self._cid is not None:
await self._event.wait()
# Context manager -- automatically disconnect.
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_traceback):
await self.disconnect()
# Use connection.l2cap_accept() instead of calling this directly.
async def accept(connection, psm, mtu, timeout_ms):
global _listening
channel = L2CAPChannel(connection)
# Start the stack listening if necessary.
if not _listening:
ble.l2cap_listen(psm, mtu)
_listening = True
# Wait for the connect irq from the remote connection.
with connection.timeout(timeout_ms):
await channel._event.wait()
return channel
# Use connection.l2cap_connect() instead of calling this directly.
async def connect(connection, psm, mtu, timeout_ms):
if _listening:
raise ValueError("Can't connect while listening")
channel = L2CAPChannel(connection)
with connection.timeout(timeout_ms):
ble.l2cap_connect(connection._conn_handle, psm, mtu)
# Wait for the connect irq from the remote connection.
# If the connection fails, we get a disconnect event (with status) instead.
await channel._event.wait()
if channel._cid is not None:
return channel
else:
raise L2CAPConnectionError(channel._status)

179
ble/aioble/peripheral.py Normal file
View File

@ -0,0 +1,179 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import bluetooth
import struct
import uasyncio as asyncio
from .core import (
ensure_active,
ble,
log_info,
log_error,
log_warn,
register_irq_handler,
)
from .device import Device, DeviceConnection, DeviceTimeout
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
_ADV_TYPE_MANUFACTURER = const(0xFF)
_ADV_PAYLOAD_MAX_LEN = const(31)
_incoming_connection = None
_connect_event = None
def _peripheral_irq(event, data):
global _incoming_connection
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, addr_type, addr = data
# Create, initialise, and register the device.
device = Device(addr_type, bytes(addr))
_incoming_connection = DeviceConnection(device)
_incoming_connection._conn_handle = conn_handle
DeviceConnection._connected[conn_handle] = _incoming_connection
# Signal advertise() to return the connected device.
_connect_event.set()
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
if connection := DeviceConnection._connected.get(conn_handle, None):
# Tell the device_task that it should terminate.
connection._event.set()
def _peripheral_shutdown():
global _incoming_connection, _connect_event
_incoming_connection = None
_connect_event = None
register_irq_handler(_peripheral_irq, _peripheral_shutdown)
# Advertising payloads are repeated packets of the following form:
# 1 byte data length (N + 1)
# 1 byte type (see constants below)
# N bytes type-specific data
def _append(adv_data, resp_data, adv_type, value):
data = struct.pack("BB", len(value) + 1, adv_type) + value
if len(data) + len(adv_data) < _ADV_PAYLOAD_MAX_LEN:
adv_data += data
return resp_data
if len(data) + (len(resp_data) if resp_data else 0) < _ADV_PAYLOAD_MAX_LEN:
if not resp_data:
# Overflow into resp_data for the first time.
resp_data = bytearray()
resp_data += data
return resp_data
raise ValueError("Advertising payload too long")
async def advertise(
interval_us,
adv_data=None,
resp_data=None,
connectable=True,
limited_disc=False,
br_edr=False,
name=None,
services=None,
appearance=0,
manufacturer=None,
timeout_ms=None,
):
global _incoming_connection, _connect_event
ensure_active()
if not adv_data and not resp_data:
# If the user didn't manually specify adv_data / resp_data then
# construct them from the kwargs. Keep adding fields to adv_data,
# overflowing to resp_data if necessary.
# TODO: Try and do better bin-packing than just concatenating in
# order?
adv_data = bytearray()
resp_data = _append(
adv_data,
resp_data,
_ADV_TYPE_FLAGS,
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)),
)
# Services are prioritised to go in the advertising data because iOS supports
# filtering scan results by service only, so services must come first.
if services:
for uuid in services:
b = bytes(uuid)
if len(b) == 2:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID16_COMPLETE, b)
elif len(b) == 4:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID32_COMPLETE, b)
elif len(b) == 16:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID128_COMPLETE, b)
if name:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_NAME, name)
if appearance:
# See org.bluetooth.characteristic.gap.appearance.xml
resp_data = _append(
adv_data, resp_data, _ADV_TYPE_APPEARANCE, struct.pack("<H", appearance)
)
if manufacturer:
resp_data = _append(
adv_data,
resp_data,
_ADV_TYPE_MANUFACTURER,
struct.pack("<H", manufacturer[0]) + manufacturer[1],
)
_connect_event = _connect_event or asyncio.ThreadSafeFlag()
ble.gap_advertise(interval_us, adv_data=adv_data, resp_data=resp_data, connectable=connectable)
try:
# Allow optional timeout for a central to connect to us (or just to stop advertising).
with DeviceTimeout(None, timeout_ms):
await _connect_event.wait()
# Get the newly connected connection to the central and start a task
# to wait for disconnection.
result = _incoming_connection
_incoming_connection = None
# This mirrors what connecting to a central does.
result._run_task()
return result
except asyncio.CancelledError:
# Something else cancelled this task (to manually stop advertising).
ble.gap_advertise(None)
except asyncio.TimeoutError:
# DeviceTimeout waiting for connection.
ble.gap_advertise(None)
raise

178
ble/aioble/security.py Normal file
View File

@ -0,0 +1,178 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const, schedule
import uasyncio as asyncio
import binascii
import json
from .core import log_info, log_warn, ble, register_irq_handler
from .device import DeviceConnection
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_GET_SECRET = const(29)
_IRQ_SET_SECRET = const(30)
_IRQ_PASSKEY_ACTION = const(31)
_IO_CAPABILITY_DISPLAY_ONLY = const(0)
_IO_CAPABILITY_DISPLAY_YESNO = const(1)
_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
_PASSKEY_ACTION_INPUT = const(2)
_PASSKEY_ACTION_DISP = const(3)
_PASSKEY_ACTION_NUMCMP = const(4)
_DEFAULT_PATH = "ble_secrets.json"
_secrets = {}
_modified = False
_path = None
# Must call this before stack startup.
def load_secrets(path=None):
global _path, _secrets
# Use path if specified, otherwise use previous path, otherwise use
# default path.
_path = path or _path or _DEFAULT_PATH
# Reset old secrets.
_secrets = {}
try:
with open(_path, "r") as f:
entries = json.load(f)
for sec_type, key, value in entries:
# Decode bytes from hex.
_secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
except:
log_warn("No secrets available")
# Call this whenever the secrets dict changes.
def _save_secrets(arg=None):
global _modified, _path
_path = _path or _DEFAULT_PATH
if not _modified:
# Only save if the secrets changed.
return
with open(_path, "w") as f:
# Convert bytes to hex strings (otherwise JSON will treat them like
# strings).
json_secrets = [
(sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
for (sec_type, key), value in _secrets.items()
]
json.dump(json_secrets, f)
_modified = False
def _security_irq(event, data):
global _modified
if event == _IRQ_ENCRYPTION_UPDATE:
# Connection has updated (usually due to pairing).
conn_handle, encrypted, authenticated, bonded, key_size = data
log_info("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
if connection := DeviceConnection._connected.get(conn_handle, None):
connection.encrypted = encrypted
connection.authenticated = authenticated
connection.bonded = bonded
connection.key_size = key_size
# TODO: Handle failure.
if encrypted and connection._pair_event:
connection._pair_event.set()
elif event == _IRQ_SET_SECRET:
sec_type, key, value = data
key = sec_type, bytes(key)
value = bytes(value) if value else None
log_info("set secret:", key, value)
if value is None:
# Delete secret.
if key not in _secrets:
return False
del _secrets[key]
else:
# Save secret.
_secrets[key] = value
# Queue up a save (don't synchronously write to flash).
_modified = True
schedule(_save_secrets, None)
return True
elif event == _IRQ_GET_SECRET:
sec_type, index, key = data
log_info("get secret:", sec_type, index, bytes(key) if key else None)
if key is None:
# Return the index'th secret of this type.
i = 0
for (t, _key), value in _secrets.items():
if t == sec_type:
if i == index:
return value
i += 1
return None
else:
# Return the secret for this key (or None).
key = sec_type, bytes(key)
return _secrets.get(key, None)
elif event == _IRQ_PASSKEY_ACTION:
conn_handle, action, passkey = data
log_info("passkey action", conn_handle, action, passkey)
# if action == _PASSKEY_ACTION_NUMCMP:
# # TODO: Show this passkey and confirm accept/reject.
# accept = 1
# self._ble.gap_passkey(conn_handle, action, accept)
# elif action == _PASSKEY_ACTION_DISP:
# # TODO: Generate and display a passkey so the remote device can enter it.
# passkey = 123456
# self._ble.gap_passkey(conn_handle, action, passkey)
# elif action == _PASSKEY_ACTION_INPUT:
# # TODO: Ask the user to enter the passkey shown on the remote device.
# passkey = 123456
# self._ble.gap_passkey(conn_handle, action, passkey)
# else:
# log_warn("unknown passkey action")
def _security_shutdown():
global _secrets, _modified, _path
_secrets = {}
_modified = False
_path = None
register_irq_handler(_security_irq, _security_shutdown)
# Use device.pair() rather than calling this directly.
async def pair(
connection,
bond=True,
le_secure=True,
mitm=False,
io=_IO_CAPABILITY_NO_INPUT_OUTPUT,
timeout_ms=20000,
):
ble.config(bond=bond, le_secure=le_secure, mitm=mitm, io=io)
with connection.timeout(timeout_ms):
connection._pair_event = asyncio.ThreadSafeFlag()
ble.gap_pair(connection._conn_handle)
await connection._pair_event.wait()
# TODO: Allow the passkey action to return to here and
# invoke a callback or task to process the action.

344
ble/aioble/server.py Normal file
View File

@ -0,0 +1,344 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
from collections import deque
import bluetooth
import uasyncio as asyncio
from .core import (
ensure_active,
ble,
log_info,
log_error,
log_warn,
register_irq_handler,
GattError,
)
from .device import DeviceConnection, DeviceTimeout
_registered_characteristics = {}
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_GATTS_INDICATE_DONE = const(20)
_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
_FLAG_READ_ENCRYPTED = const(0x0200)
_FLAG_READ_AUTHENTICATED = const(0x0400)
_FLAG_READ_AUTHORIZED = const(0x0800)
_FLAG_WRITE_ENCRYPTED = const(0x1000)
_FLAG_WRITE_AUTHENTICATED = const(0x2000)
_FLAG_WRITE_AUTHORIZED = const(0x4000)
_FLAG_WRITE_CAPTURE = const(0x10000)
_FLAG_DESC_READ = const(1)
_FLAG_DESC_WRITE = const(2)
_WRITE_CAPTURE_QUEUE_LIMIT = const(10)
def _server_irq(event, data):
if event == _IRQ_GATTS_WRITE:
conn_handle, attr_handle = data
Characteristic._remote_write(conn_handle, attr_handle)
elif event == _IRQ_GATTS_READ_REQUEST:
conn_handle, attr_handle = data
return Characteristic._remote_read(conn_handle, attr_handle)
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
Characteristic._indicate_done(conn_handle, value_handle, status)
def _server_shutdown():
global _registered_characteristics
_registered_characteristics = {}
if hasattr(BaseCharacteristic, "_capture_task"):
BaseCharacteristic._capture_task.cancel()
del BaseCharacteristic._capture_queue
del BaseCharacteristic._capture_write_event
del BaseCharacteristic._capture_consumed_event
del BaseCharacteristic._capture_task
register_irq_handler(_server_irq, _server_shutdown)
class Service:
def __init__(self, uuid):
self.uuid = uuid
self.characteristics = []
# Generate tuple for gatts_register_services.
def _tuple(self):
return (self.uuid, tuple(c._tuple() for c in self.characteristics))
class BaseCharacteristic:
def _register(self, value_handle):
self._value_handle = value_handle
_registered_characteristics[value_handle] = self
if self._initial is not None:
self.write(self._initial)
self._initial = None
# Read value from local db.
def read(self):
if self._value_handle is None:
return self._initial or b""
else:
return ble.gatts_read(self._value_handle)
# Write value to local db, and optionally notify/indicate subscribers.
def write(self, data, send_update=False):
if self._value_handle is None:
self._initial = data
else:
ble.gatts_write(self._value_handle, data, send_update)
# When the a capture-enabled characteristic is created, create the
# necessary events (if not already created).
@staticmethod
def _init_capture():
if hasattr(BaseCharacteristic, "_capture_queue"):
return
BaseCharacteristic._capture_queue = deque((), _WRITE_CAPTURE_QUEUE_LIMIT)
BaseCharacteristic._capture_write_event = asyncio.ThreadSafeFlag()
BaseCharacteristic._capture_consumed_event = asyncio.ThreadSafeFlag()
BaseCharacteristic._capture_task = asyncio.create_task(
BaseCharacteristic._run_capture_task()
)
# Monitor the shared queue for incoming characteristic writes and forward
# them sequentially to the individual characteristic events.
@staticmethod
async def _run_capture_task():
write = BaseCharacteristic._capture_write_event
consumed = BaseCharacteristic._capture_consumed_event
q = BaseCharacteristic._capture_queue
while True:
if len(q):
conn, data, characteristic = q.popleft()
# Let the characteristic waiting in `written()` know that it
# can proceed.
characteristic._write_data = (conn, data)
characteristic._write_event.set()
# Wait for the characteristic to complete `written()` before
# continuing.
await consumed.wait()
if not len(q):
await write.wait()
# Wait for a write on this characteristic. Returns the connection that did
# the write, or a tuple of (connection, value) if capture is enabled for
# this characteristics.
async def written(self, timeout_ms=None):
if not hasattr(self, "_write_event"):
# Not a writable characteristic.
return
# If no write has been seen then we need to wait. If the event has
# already been set this will clear the event and continue
# immediately. In regular mode, this is set by the write IRQ
# directly (in _remote_write). In capture mode, this is set when it's
# our turn by _capture_task.
with DeviceTimeout(None, timeout_ms):
await self._write_event.wait()
# Return the write data and clear the stored copy.
# In default usage this will be just the connection handle.
# In capture mode this will be a tuple of (connection_handle, received_data)
data = self._write_data
self._write_data = None
if self.flags & _FLAG_WRITE_CAPTURE:
# Notify the shared queue monitor that the event has been consumed
# by the caller to `written()` and another characteristic can now
# proceed.
BaseCharacteristic._capture_consumed_event.set()
return data
def on_read(self, connection):
return 0
def _remote_write(conn_handle, value_handle):
if characteristic := _registered_characteristics.get(value_handle, None):
# If we've gone from empty to one item, then wake something
# blocking on `await char.written()`.
conn = DeviceConnection._connected.get(conn_handle, None)
if characteristic.flags & _FLAG_WRITE_CAPTURE:
# For capture, we append the connection and the written value
# value to the shared queue along with the matching characteristic object.
# The deque will enforce the max queue len.
data = characteristic.read()
BaseCharacteristic._capture_queue.append((conn, data, characteristic))
BaseCharacteristic._capture_write_event.set()
else:
# Store the write connection handle to be later used to retrieve the data
# then set event to handle in written() task.
characteristic._write_data = conn
characteristic._write_event.set()
def _remote_read(conn_handle, value_handle):
if characteristic := _registered_characteristics.get(value_handle, None):
return characteristic.on_read(DeviceConnection._connected.get(conn_handle, None))
class Characteristic(BaseCharacteristic):
def __init__(
self,
service,
uuid,
read=False,
write=False,
write_no_response=False,
notify=False,
indicate=False,
initial=None,
capture=False,
):
service.characteristics.append(self)
self.descriptors = []
flags = 0
if read:
flags |= _FLAG_READ
if write or write_no_response:
flags |= (_FLAG_WRITE if write else 0) | (
_FLAG_WRITE_NO_RESPONSE if write_no_response else 0
)
if capture:
# Capture means that we keep track of all writes, and capture
# their values (and connection) in a queue. Otherwise we just
# track the connection of the most recent write.
flags |= _FLAG_WRITE_CAPTURE
BaseCharacteristic._init_capture()
# Set when this characteristic has a value waiting in self._write_data.
self._write_event = asyncio.ThreadSafeFlag()
# The connection of the most recent write, or a tuple of
# (connection, data) if capture is enabled.
self._write_data = None
if notify:
flags |= _FLAG_NOTIFY
if indicate:
flags |= _FLAG_INDICATE
# TODO: This should probably be a dict of connection to (ev, status).
# Right now we just support a single indication at a time.
self._indicate_connection = None
self._indicate_event = asyncio.ThreadSafeFlag()
self._indicate_status = None
self.uuid = uuid
self.flags = flags
self._value_handle = None
self._initial = initial
# Generate tuple for gatts_register_services.
def _tuple(self):
if self.descriptors:
return (self.uuid, self.flags, tuple(d._tuple() for d in self.descriptors))
else:
# Workaround: v1.19 and below can't handle an empty descriptor tuple.
return (self.uuid, self.flags)
def notify(self, connection, data=None):
if not (self.flags & _FLAG_NOTIFY):
raise ValueError("Not supported")
ble.gatts_notify(connection._conn_handle, self._value_handle, data)
async def indicate(self, connection, timeout_ms=1000):
if not (self.flags & _FLAG_INDICATE):
raise ValueError("Not supported")
if self._indicate_connection is not None:
raise ValueError("In progress")
if not connection.is_connected():
raise ValueError("Not connected")
self._indicate_connection = connection
self._indicate_status = None
try:
with connection.timeout(timeout_ms):
ble.gatts_indicate(connection._conn_handle, self._value_handle)
await self._indicate_event.wait()
if self._indicate_status != 0:
raise GattError(self._indicate_status)
finally:
self._indicate_connection = None
def _indicate_done(conn_handle, value_handle, status):
if characteristic := _registered_characteristics.get(value_handle, None):
if connection := DeviceConnection._connected.get(conn_handle, None):
if not characteristic._indicate_connection:
# Timeout.
return
# See TODO in __init__ to support multiple concurrent indications.
assert connection == characteristic._indicate_connection
characteristic._indicate_status = status
characteristic._indicate_event.set()
class BufferedCharacteristic(Characteristic):
def __init__(self, service, uuid, max_len=20, append=False):
super().__init__(service, uuid, read=True)
self._max_len = max_len
self._append = append
def _register(self, value_handle):
super()._register(value_handle)
ble.gatts_set_buffer(value_handle, self._max_len, self._append)
class Descriptor(BaseCharacteristic):
def __init__(self, characteristic, uuid, read=False, write=False, initial=None):
characteristic.descriptors.append(self)
# Workaround for https://github.com/micropython/micropython/issues/6864
flags = 0
if read:
flags |= _FLAG_DESC_READ
if write:
self._write_event = asyncio.ThreadSafeFlag()
self._write_data = None
flags |= _FLAG_DESC_WRITE
self.uuid = uuid
self.flags = flags
self._value_handle = None
self._initial = initial
# Generate tuple for gatts_register_services.
def _tuple(self):
return (self.uuid, self.flags)
# Turn the Service/Characteristic/Descriptor classes into a registration tuple
# and then extract their value handles.
def register_services(*services):
ensure_active()
_registered_characteristics.clear()
handles = ble.gatts_register_services(tuple(s._tuple() for s in services))
for i in range(len(services)):
service_handles = handles[i]
service = services[i]
n = 0
for characteristic in service.characteristics:
characteristic._register(service_handles[n])
n += 1
for descriptor in characteristic.descriptors:
descriptor._register(service_handles[n])
n += 1

137
ble/mainDongle.py Normal file
View File

@ -0,0 +1,137 @@
import sys
sys.path.append("")
from micropython import const
import json
import uasyncio as asyncio
import aioble
import bluetooth
import struct
_SERVICE_UUID = bluetooth.UUID(0x1234)
_CHAR_UUID = bluetooth.UUID(0x1235)
MAX_MSG_DATA_LENGTH = const(18)
_COMMAND_DONE = const(0)
_COMMAND_SENDDATA = const(1)
_COMMAND_SENDCHUNK = const(2) # send chunk of data, use _COMMAND_SENDDATA for last chunk
_COMMAND_SENDBYTESDATA = const(3)
_COMMAND_SENDBYTESCHUNK = const(4) # send chunk of bytes, use _COMMAND_SENDBYTESDATA for last chunk
class ManageDongle:
def __init__(self, device):
self._device = device
self._connection = None
self._seq = 1
async def connect(self):
try:
print("Connecting to", self._device)
self._connection = await self._device.connect()
except asyncio.TimeoutError:
print("Timeout during connection")
return
try:
print("Discovering...")
service = await self._connection.service(_SERVICE_UUID)
#uuids = []
#async for char in service.characteristics():
# uuids.append(char.uuid)
#print('uuids', uuids)
self._characteristic = await service.characteristic(_CHAR_UUID)
except asyncio.TimeoutError:
print("Timeout discovering services/characteristics")
return
asyncio.create_task(self.readFromBle())
await asyncio.sleep(0.1)
self.sendDictToCom({'type':'connected'})
async def _command(self, cmd, data):
send_seq = self._seq
await self._characteristic.write(struct.pack("<BB", cmd, send_seq) + data)
#print('sent packet num', send_seq)
self._seq += 1
return send_seq
async def readFromBle(self):
msgChunk = ''
while True:
read = await self._characteristic.notified()
# message format is <command><data>
cmd = read[0]
#print('received from BLE', read)
self.sendDictToCom({'type':'debug', 'from':'fromBle','cmd':cmd, 'data':read[1:]})
if cmd in [_COMMAND_SENDCHUNK, _COMMAND_SENDBYTESCHUNK]:
#self.sendDictToCom({'type':'debug', 'from':'chunkFromBle','string':msgChunk})
msgChunk += read[1:].decode()
elif cmd in [_COMMAND_SENDDATA, _COMMAND_SENDBYTESDATA]:
# message to send to computer over COM port
msgFormat = 'base64' if cmd == _COMMAND_SENDBYTESDATA else 'str'
msg = msgChunk + read[1:].decode()
self.sendDictToCom({'type':'msgFromBle', 'format':msgFormat, 'string':msg})
msgChunk = ''
async def sendData(self, data:str, base64:bool=False):
"""Send a string or bytes sequence over BLE
:param data: string to send (plain str or base64 formated)
:param base64: if True, data is a base64 formated string"""
sendMsgType = _COMMAND_SENDBYTESCHUNK if base64 else _COMMAND_SENDCHUNK
while len(data) > MAX_MSG_DATA_LENGTH:
chunk = data[:MAX_MSG_DATA_LENGTH]
self.sendDictToCom({'type':'debug', 'from':'sendChunkToBle','string':chunk})
await self._command(sendMsgType, chunk.encode())
data = data[MAX_MSG_DATA_LENGTH:]
sendMsgType = _COMMAND_SENDBYTESDATA if base64 else _COMMAND_SENDDATA
#self.sendDictToCom({'type':'msgType', 'strOrBase64':sendMsgType, 'sentdata':data})
await self._command(sendMsgType, data.encode())
self.sendDictToCom({'type':'sentMessage'})
async def disconnect(self):
if self._connection:
await self._connection.disconnect()
def sendDictToCom(self, data:dict):
print(json.dumps(data))
async def main():
print('start dongle')
while True:
try:
line = input()
except KeyboardInterrupt:
# when ctrl-C is sent to dongle, we receive a KeyboardInterrupt
sys.exit(0)
#print('dongle received:', line)
receivedMsgDict = json.loads(line)
if receivedMsgDict['type'] == 'connect':
# => start BLE scan and connect on this peripheral
peripheralName = receivedMsgDict['name']
async with aioble.scan(5000, 30000, 30000, active=True) as scanner:
async for result in scanner:
# print('scan', result.name(), result.services())
print('scan', result.name(), result.rssi, result.services())
if result.name() == peripheralName and _SERVICE_UUID in result.services():
device = result.device
break
else:
print("Server not found")
return
client = ManageDongle(device)
await client.connect()
elif receivedMsgDict['type'] == 'disconnect':
await client.disconnect()
elif receivedMsgDict['type'] == 'msg':
#msgFormat = 'base64' in receivedMsgDict
if 'format' not in receivedMsgDict or receivedMsgDict['format'] not in ['str', 'base64']:
client.sendDictToCom({'type':'badMessage', 'error':'invalid format', 'received':receivedMsgDict})
continue
msgFormat = True if receivedMsgDict['format'] == 'base64' else False
await client.sendData(receivedMsgDict['string'], base64=msgFormat)
else:
print('unknown message type', receivedMsgDict)
await client.disconnect()
asyncio.run(main())

85
ble/mainPcTestBLE.py Normal file
View File

@ -0,0 +1,85 @@
# python -m serial.tools.list_ports
# python mainPcTestBLE.py -p <port com>
# In this example, PC will send some messages to robot,
# and verify it receives checksum of these messages from robot
# Note: if message from PC to robot exceeds 18 characters, it will be split in
# several BLE messages, then merged at robot side to get original message
import sys
import binascii
import time
import argparse
import random
import ComWithDongle
robotName = 'myTeamName'
randCharRange = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
expectedToReceive = []
def onMsgFromRobot(data:str):
"""Function to call when a message sent by robot is received
:param data: message"""
print('received msg', data, flush=True)
print('compair to', expectedToReceive, flush=True)
if data in expectedToReceive:
expectedToReceive.remove(data)
print('-not received yet', len(expectedToReceive), expectedToReceive, flush=True)
else:
print('bad message received', data)
print('expected to receive')
for s in expectedToReceive:
print(' ', s)
exit(1)
parser = argparse.ArgumentParser(
description='Script to communicate with STM32WB55 dongle connected on computer')
parser.add_argument('-p', '--portcom', type=str, help='id of com port used')
parser.add_argument('-d', '--debug', action='store_true', help='display debug messages')
parser.add_argument('-l', '--length', type=int, default=16,
help='number of characters to send over BLE, in each message')
parser.add_argument('-n', '--number', type=int, default=5 , help='number of messages to send over BLE')
parser.add_argument('-b', '--bytes', action='store_true', help='send bytes instead of string')
args = parser.parse_args()
try:
print('start main')
# wait BLE connection is established
com = ComWithDongle.ComWithDongle(comPort=args.portcom, peripheralName=robotName,
onMsgReceived=onMsgFromRobot, debug=args.debug)
print('connected to', robotName)
msgId = 0
while True:
if args.bytes:
data = random.randbytes(args.length)
else:
data = ''.join([random.choice(randCharRange) for _ in range(args.length)])
print('send data', len(data), msgId, data, flush=True)
checksum = binascii.crc32(data)
expectedToReceive.append(str(checksum))
com.sendMsg(data)
print('+not received yet', len(expectedToReceive), expectedToReceive, flush=True)
msgId += 1
if msgId >= args.number: break
#time.sleep(0.01)
time.sleep(0.2)
#all messages sent, wait while we receive some messages
com.sendMsg('END')
nbMissing = len(expectedToReceive)
lastNbMissing = 0
while not nbMissing == lastNbMissing:
if nbMissing == 0:
print('all messages received')
exit(0)
print('missing', expectedToReceive, flush=True)
lastNbMissing = nbMissing
com.sendMsg('END')
time.sleep(2)
nbMissing = len(expectedToReceive)
except KeyboardInterrupt:
pass
com.disconnect()
exit(0)

41
ble/mainRobotTestBLE.py Normal file
View File

@ -0,0 +1,41 @@
# to know COM port used when connected on PC:
# python -m serial.tools.list_ports
# in this example, robot will send back to PC the checksum of each message received
import binascii
import uasyncio as asyncio
import RobotBleServer
robotName = 'myTeamName'
toSend = []
def onMsgToRobot(data:str|bytes):
"""Function to call when a message sent by PC is received
:param data: message received"""
checksum = binascii.crc32(data)
print('received', data, '=>', checksum)
toSend.append(str(checksum))
async def robotMainTask(bleConnection):
"""Main function for robot activities
:param bleConnection: object to check BLE connection and send messages"""
while True:
await asyncio.sleep(0.1)
#print('connection', bleConnection.connection)
if not bleConnection.connection: continue
if toSend == []: continue
while not toSend == []:
data = toSend.pop(0)
bleConnection.sendMessage(data)
print('sent', data)
# Run tasks
async def main():
print('Start main')
bleConnection = RobotBleServer.RobotBleServer(robotName=robotName, onMsgReceived=onMsgToRobot)
asyncio.create_task(robotMainTask(bleConnection))
await bleConnection.communicationTask()
asyncio.run(main())

11
ble/toDongle.sh Normal file
View File

@ -0,0 +1,11 @@
#!/usr/bin/bash
drive=$1
if [[($drive == "")]]; then
echo missing drive
exit 1
fi
cp -r aioble $drive/
cp mainDongle.py $drive/main.py
sync

12
ble/toRobot.sh Normal file
View File

@ -0,0 +1,12 @@
#!/usr/bin/bash
drive=$1
if [[($drive == "")]]; then
echo missing drive
exit 1
fi
cp -r aioble $drive/
cp RobotBleServer.py $drive
cp mainRobotTestBLE.py $drive/main.py
sync