Ajout fichiers

This commit is contained in:
Nogard 2026-03-21 19:53:38 +01:00
parent fe9ca66500
commit eb834b8f72
21 changed files with 4154 additions and 0 deletions

32
robot/aioble/__init__.py Normal file
View File

@ -0,0 +1,32 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
from .device import Device, DeviceDisconnectedError
from .core import log_info, log_warn, log_error, GattError, config, stop
try:
from .peripheral import advertise
except:
log_info("Peripheral support disabled")
try:
from .central import scan
except:
log_info("Central support disabled")
try:
from .server import (
Service,
Characteristic,
BufferedCharacteristic,
Descriptor,
register_services,
)
except:
log_info("GATT server support disabled")
ADDR_PUBLIC = const(0)
ADDR_RANDOM = const(1)

297
robot/aioble/central.py Normal file
View File

@ -0,0 +1,297 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import bluetooth
import struct
import uasyncio as asyncio
from .core import (
ensure_active,
ble,
log_info,
log_error,
log_warn,
register_irq_handler,
)
from .device import Device, DeviceConnection, DeviceTimeout
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_ADV_IND = const(0)
_ADV_DIRECT_IND = const(1)
_ADV_SCAN_IND = const(2)
_ADV_NONCONN_IND = const(3)
_SCAN_RSP = const(4)
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_SHORT_NAME = const(0x08)
_ADV_TYPE_UUID16_INCOMPLETE = const(0x2)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_INCOMPLETE = const(0x4)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_INCOMPLETE = const(0x6)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_APPEARANCE = const(0x19)
_ADV_TYPE_MANUFACTURER = const(0xFF)
# Keep track of the active scanner so IRQs can be delivered to it.
_active_scanner = None
# Set of devices that are waiting for the peripheral connect IRQ.
_connecting = set()
def _central_irq(event, data):
# Send results and done events to the active scanner instance.
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
if not _active_scanner:
return
_active_scanner._queue.append((addr_type, bytes(addr), adv_type, rssi, bytes(adv_data)))
_active_scanner._event.set()
elif event == _IRQ_SCAN_DONE:
if not _active_scanner:
return
_active_scanner._done = True
_active_scanner._event.set()
# Peripheral connect must be in response to a pending connection, so find
# it in the pending connection set.
elif event == _IRQ_PERIPHERAL_CONNECT:
conn_handle, addr_type, addr = data
for d in _connecting:
if d.addr_type == addr_type and d.addr == addr:
# Allow connect() to complete.
connection = d._connection
connection._conn_handle = conn_handle
connection._event.set()
break
# Find the active device connection for this connection handle.
elif event == _IRQ_PERIPHERAL_DISCONNECT:
conn_handle, _, _ = data
if connection := DeviceConnection._connected.get(conn_handle, None):
# Tell the device_task that it should terminate.
connection._event.set()
def _central_shutdown():
global _active_scanner, _connecting
_active_scanner = None
_connecting = set()
register_irq_handler(_central_irq, _central_shutdown)
# Cancel an in-progress scan.
async def _cancel_pending():
if _active_scanner:
await _active_scanner.cancel()
# Start connecting to a peripheral.
# Call device.connect() rather than using method directly.
async def _connect(connection, timeout_ms):
device = connection.device
if device in _connecting:
return
# Enable BLE and cancel in-progress scans.
ensure_active()
await _cancel_pending()
# Allow the connected IRQ to find the device by address.
_connecting.add(device)
# Event will be set in the connected IRQ, and then later
# re-used to notify disconnection.
connection._event = connection._event or asyncio.ThreadSafeFlag()
try:
with DeviceTimeout(None, timeout_ms):
ble.gap_connect(device.addr_type, device.addr)
# Wait for the connected IRQ.
await connection._event.wait()
assert connection._conn_handle is not None
# Register connection handle -> device.
DeviceConnection._connected[connection._conn_handle] = connection
finally:
# After timeout, don't hold a reference and ignore future events.
_connecting.remove(device)
# Represents a single device that has been found during a scan. The scan
# iterator will return the same ScanResult instance multiple times as its data
# changes (i.e. changing RSSI or advertising data).
class ScanResult:
def __init__(self, device):
self.device = device
self.adv_data = None
self.resp_data = None
self.rssi = None
self.connectable = False
# New scan result available, return true if it changes our state.
def _update(self, adv_type, rssi, adv_data):
updated = False
if rssi != self.rssi:
self.rssi = rssi
updated = True
if adv_type in (_ADV_IND, _ADV_NONCONN_IND):
if adv_data != self.adv_data:
self.adv_data = adv_data
self.connectable = adv_type == _ADV_IND
updated = True
elif adv_type == _ADV_SCAN_IND:
if adv_data != self.adv_data and self.resp_data:
updated = True
self.adv_data = adv_data
elif adv_type == _SCAN_RSP and adv_data:
if adv_data != self.resp_data:
self.resp_data = adv_data
updated = True
return updated
def __str__(self):
return "Scan result: {} {}".format(self.device, self.rssi)
# Gets all the fields for the specified types.
def _decode_field(self, *adv_type):
# Advertising payloads are repeated packets of the following form:
# 1 byte data length (N + 1)
# 1 byte type (see constants below)
# N bytes type-specific data
for payload in (self.adv_data, self.resp_data):
if not payload:
continue
i = 0
while i + 1 < len(payload):
if payload[i + 1] in adv_type:
yield payload[i + 2 : i + payload[i] + 1]
i += 1 + payload[i]
# Returns the value of the complete (or shortened) advertised name, if available.
def name(self):
for n in self._decode_field(_ADV_TYPE_NAME, _ADV_TYPE_SHORT_NAME):
return str(n, "utf-8") if n else ""
# Generator that enumerates the service UUIDs that are advertised.
def services(self):
for u in self._decode_field(_ADV_TYPE_UUID16_INCOMPLETE, _ADV_TYPE_UUID16_COMPLETE):
yield bluetooth.UUID(struct.unpack("<H", u)[0])
for u in self._decode_field(_ADV_TYPE_UUID32_INCOMPLETE, _ADV_TYPE_UUID32_COMPLETE):
yield bluetooth.UUID(struct.unpack("<I", u)[0])
for u in self._decode_field(_ADV_TYPE_UUID128_INCOMPLETE, _ADV_TYPE_UUID128_COMPLETE):
yield bluetooth.UUID(u)
# Generator that returns (manufacturer_id, data) tuples.
def manufacturer(self, filter=None):
for u in self._decode_field(_ADV_TYPE_MANUFACTURER):
if len(u) < 2:
continue
m = struct.unpack("<H", u[0:2])[0]
if filter is None or m == filter:
yield (m, u[2:])
# Use with:
# async with aioble.scan(...) as scanner:
# async for result in scanner:
# ...
class scan:
def __init__(self, duration_ms, interval_us=None, window_us=None, active=False):
self._queue = []
self._event = asyncio.ThreadSafeFlag()
self._done = False
# Keep track of what we've already seen.
self._results = set()
# Ideally we'd start the scan here and avoid having to save these
# values, but we need to stop any previous scan first via awaiting
# _cancel_pending(), but __init__ isn't async.
self._duration_ms = duration_ms
self._interval_us = interval_us or 1280000
self._window_us = window_us or 11250
self._active = active
async def __aenter__(self):
global _active_scanner
ensure_active()
await _cancel_pending()
_active_scanner = self
ble.gap_scan(self._duration_ms, self._interval_us, self._window_us, self._active)
return self
async def __aexit__(self, exc_type, exc_val, exc_traceback):
# Cancel the current scan if we're still the active scanner. This will
# happen if the loop breaks early before the scan duration completes.
if _active_scanner == self:
await self.cancel()
def __aiter__(self):
assert _active_scanner == self
return self
async def __anext__(self):
global _active_scanner
if _active_scanner != self:
# The scan has been canceled (e.g. a connection was initiated).
raise StopAsyncIteration
while True:
while self._queue:
addr_type, addr, adv_type, rssi, adv_data = self._queue.pop()
# Try to find an existing ScanResult for this device.
for r in self._results:
if r.device.addr_type == addr_type and r.device.addr == addr:
result = r
break
else:
# New device, create a new Device & ScanResult.
device = Device(addr_type, addr)
result = ScanResult(device)
self._results.add(result)
# Add the new information from this event.
if result._update(adv_type, rssi, adv_data):
# It's new information, so re-yield this result.
return result
if self._done:
# _IRQ_SCAN_DONE event was fired.
_active_scanner = None
raise StopAsyncIteration
# Wait for either done or result IRQ.
await self._event.wait()
# Cancel any in-progress scan. We need to do this before starting any other operation.
async def cancel(self):
if self._done:
return
ble.gap_scan(None)
while not self._done:
await self._event.wait()
global _active_scanner
_active_scanner = None

456
robot/aioble/client.py Normal file
View File

@ -0,0 +1,456 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
from collections import deque
import uasyncio as asyncio
import struct
import bluetooth
from .core import ble, GattError, register_irq_handler
from .device import DeviceConnection
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_CCCD_UUID = const(0x2902)
_CCCD_NOTIFY = const(1)
_CCCD_INDICATE = const(2)
_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
# Forward IRQs directly to static methods on the type that handles them and
# knows how to map handles to instances. Note: We copy all uuid and data
# params here for safety, but a future optimisation might be able to avoid
# these copies in a few places.
def _client_irq(event, data):
if event == _IRQ_GATTC_SERVICE_RESULT:
conn_handle, start_handle, end_handle, uuid = data
ClientDiscover._discover_result(
conn_handle, start_handle, end_handle, bluetooth.UUID(uuid)
)
elif event == _IRQ_GATTC_SERVICE_DONE:
conn_handle, status = data
ClientDiscover._discover_done(conn_handle, status)
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
conn_handle, end_handle, value_handle, properties, uuid = data
ClientDiscover._discover_result(
conn_handle, end_handle, value_handle, properties, bluetooth.UUID(uuid)
)
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
conn_handle, status = data
ClientDiscover._discover_done(conn_handle, status)
elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
conn_handle, dsc_handle, uuid = data
ClientDiscover._discover_result(conn_handle, dsc_handle, bluetooth.UUID(uuid))
elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
conn_handle, status = data
ClientDiscover._discover_done(conn_handle, status)
elif event == _IRQ_GATTC_READ_RESULT:
conn_handle, value_handle, char_data = data
ClientCharacteristic._read_result(conn_handle, value_handle, bytes(char_data))
elif event == _IRQ_GATTC_READ_DONE:
conn_handle, value_handle, status = data
ClientCharacteristic._read_done(conn_handle, value_handle, status)
elif event == _IRQ_GATTC_WRITE_DONE:
conn_handle, value_handle, status = data
ClientCharacteristic._write_done(conn_handle, value_handle, status)
elif event == _IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
ClientCharacteristic._on_notify(conn_handle, value_handle, bytes(notify_data))
elif event == _IRQ_GATTC_INDICATE:
conn_handle, value_handle, indicate_data = data
ClientCharacteristic._on_indicate(conn_handle, value_handle, bytes(indicate_data))
register_irq_handler(_client_irq, None)
# Async generator for discovering services, characteristics, descriptors.
class ClientDiscover:
def __init__(self, connection, disc_type, parent, timeout_ms, *args):
self._connection = connection
# Each result IRQ will append to this.
self._queue = []
# This will be set by the done IRQ.
self._status = None
# Tell the generator to process new events.
self._event = asyncio.ThreadSafeFlag()
# Must implement the _start_discovery static method. Instances of this
# type are returned by __anext__.
self._disc_type = disc_type
# This will be the connection for a service discovery, and the service for a characteristic discovery.
self._parent = parent
# Timeout for the discovery process.
# TODO: Not implemented.
self._timeout_ms = timeout_ms
# Additional arguments to pass to the _start_discovery method on disc_type.
self._args = args
async def _start(self):
if self._connection._discover:
# TODO: cancel existing? (e.g. perhaps they didn't let the loop run to completion)
raise ValueError("Discovery in progress")
# Tell the connection that we're the active discovery operation (the IRQ only gives us conn_handle).
self._connection._discover = self
# Call the appropriate ubluetooth.BLE method.
self._disc_type._start_discovery(self._parent, *self._args)
def __aiter__(self):
return self
async def __anext__(self):
if self._connection._discover != self:
# Start the discovery if necessary.
await self._start()
# Keep returning items from the queue until the status is set by the
# done IRQ.
while True:
while self._queue:
return self._disc_type(self._parent, *self._queue.pop())
if self._status is not None:
self._connection._discover = None
raise StopAsyncIteration
# Wait for more results to be added to the queue.
await self._event.wait()
# Tell the active discovery instance for this connection to add a new result
# to the queue.
def _discover_result(conn_handle, *args):
if connection := DeviceConnection._connected.get(conn_handle, None):
if discover := connection._discover:
discover._queue.append(args)
discover._event.set()
# Tell the active discovery instance for this connection that it is complete.
def _discover_done(conn_handle, status):
if connection := DeviceConnection._connected.get(conn_handle, None):
if discover := connection._discover:
discover._status = status
discover._event.set()
# Represents a single service supported by a connection. Do not construct this
# class directly, instead use `async for service in connection.services([uuid])` or
# `await connection.service(uuid)`.
class ClientService:
def __init__(self, connection, start_handle, end_handle, uuid):
self.connection = connection
# Used for characteristic discovery.
self._start_handle = start_handle
self._end_handle = end_handle
# Allows comparison to a known uuid.
self.uuid = uuid
def __str__(self):
return "Service: {} {} {}".format(self._start_handle, self._end_handle, self.uuid)
# Search for a specific characteristic by uuid.
async def characteristic(self, uuid, timeout_ms=2000):
result = None
# Make sure loop runs to completion.
async for characteristic in self.characteristics(uuid, timeout_ms):
if not result and characteristic.uuid == uuid:
# Keep first result.
result = characteristic
return result
# Search for all services (optionally by uuid).
# Use with `async for`, e.g.
# async for characteristic in service.characteristics():
# Note: must allow the loop to run to completion.
def characteristics(self, uuid=None, timeout_ms=2000):
return ClientDiscover(self.connection, ClientCharacteristic, self, timeout_ms, uuid)
# For ClientDiscover
def _start_discovery(connection, uuid=None):
ble.gattc_discover_services(connection._conn_handle, uuid)
class BaseClientCharacteristic:
def __init__(self, value_handle, properties, uuid):
# Used for read/write/notify ops.
self._value_handle = value_handle
# Which operations are supported.
self.properties = properties
# Allows comparison to a known uuid.
self.uuid = uuid
if properties & _FLAG_READ:
# Fired for each read result and read done IRQ.
self._read_event = None
self._read_data = None
# Used to indicate that the read is complete.
self._read_status = None
if (properties & _FLAG_WRITE) or (properties & _FLAG_WRITE_NO_RESPONSE):
# Fired for the write done IRQ.
self._write_event = None
# Used to indicate that the write is complete.
self._write_status = None
# Register this value handle so events can find us.
def _register_with_connection(self):
self._connection()._characteristics[self._value_handle] = self
# Map an incoming IRQ to an registered characteristic.
def _find(conn_handle, value_handle):
if connection := DeviceConnection._connected.get(conn_handle, None):
if characteristic := connection._characteristics.get(value_handle, None):
return characteristic
else:
# IRQ for a characteristic that we weren't expecting. e.g.
# notification when we're not waiting on notified().
# TODO: This will happen on btstack, which doesn't give us
# value handle for the done event.
return None
def _check(self, flag):
if not (self.properties & flag):
raise ValueError("Unsupported")
# Issue a read to the characteristic.
async def read(self, timeout_ms=1000):
self._check(_FLAG_READ)
# Make sure this conn_handle/value_handle is known.
self._register_with_connection()
# This will be set by the done IRQ.
self._read_status = None
# This will be set by the result and done IRQs. Re-use if possible.
self._read_event = self._read_event or asyncio.ThreadSafeFlag()
# Issue the read.
ble.gattc_read(self._connection()._conn_handle, self._value_handle)
with self._connection().timeout(timeout_ms):
# The event will be set for each read result, then a final time for done.
while self._read_status is None:
await self._read_event.wait()
if self._read_status != 0:
raise GattError(self._read_status)
return self._read_data
# Map an incoming result IRQ to a registered characteristic.
def _read_result(conn_handle, value_handle, data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._read_data = data
characteristic._read_event.set()
# Map an incoming read done IRQ to a registered characteristic.
def _read_done(conn_handle, value_handle, status):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._read_status = status
characteristic._read_event.set()
async def write(self, data, response=None, timeout_ms=1000):
self._check(_FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE)
# If the response arg is unset, then default it to true if we only support write-with-response.
if response is None:
p = self.properties
response = (p & _FLAG_WRITE) and not (p & _FLAG_WRITE_NO_RESPONSE)
if response:
# Same as read.
self._register_with_connection()
self._write_status = None
self._write_event = self._write_event or asyncio.ThreadSafeFlag()
# Issue the write.
ble.gattc_write(self._connection()._conn_handle, self._value_handle, data, response)
if response:
with self._connection().timeout(timeout_ms):
# The event will be set for the write done IRQ.
await self._write_event.wait()
if self._write_status != 0:
raise GattError(self._write_status)
# Map an incoming write done IRQ to a registered characteristic.
def _write_done(conn_handle, value_handle, status):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._write_status = status
characteristic._write_event.set()
# Represents a single characteristic supported by a service. Do not construct
# this class directly, instead use `async for characteristic in
# service.characteristics([uuid])` or `await service.characteristic(uuid)`.
class ClientCharacteristic(BaseClientCharacteristic):
def __init__(self, service, end_handle, value_handle, properties, uuid):
self.service = service
self.connection = service.connection
# Used for descriptor discovery. If available, otherwise assume just
# past the value handle (enough for two descriptors without risking
# going into the next characteristic).
self._end_handle = end_handle if end_handle > value_handle else value_handle + 2
super().__init__(value_handle, properties, uuid)
if properties & _FLAG_NOTIFY:
# Fired when a notification arrives.
self._notify_event = asyncio.ThreadSafeFlag()
# Data for the most recent notification.
self._notify_queue = deque((), 1)
if properties & _FLAG_INDICATE:
# Same for indications.
self._indicate_event = asyncio.ThreadSafeFlag()
self._indicate_queue = deque((), 1)
def __str__(self):
return "Characteristic: {} {} {} {}".format(
self._end_handle, self._value_handle, self.properties, self.uuid
)
def _connection(self):
return self.service.connection
# Search for a specific descriptor by uuid.
async def descriptor(self, uuid, timeout_ms=2000):
result = None
# Make sure loop runs to completion.
async for descriptor in self.descriptors(timeout_ms):
if not result and descriptor.uuid == uuid:
# Keep first result.
result = descriptor
return result
# Search for all services (optionally by uuid).
# Use with `async for`, e.g.
# async for descriptor in characteristic.descriptors():
# Note: must allow the loop to run to completion.
def descriptors(self, timeout_ms=2000):
return ClientDiscover(self.connection, ClientDescriptor, self, timeout_ms)
# For ClientDiscover
def _start_discovery(service, uuid=None):
ble.gattc_discover_characteristics(
service.connection._conn_handle,
service._start_handle,
service._end_handle,
uuid,
)
# Helper for notified() and indicated().
async def _notified_indicated(self, queue, event, timeout_ms):
# Ensure that events for this connection can route to this characteristic.
self._register_with_connection()
# If the queue is empty, then we need to wait. However, if the queue
# has a single item, we also need to do a no-op wait in order to
# clear the event flag (because the queue will become empty and
# therefore the event should be cleared).
if len(queue) <= 1:
with self._connection().timeout(timeout_ms):
await event.wait()
# Either we started > 1 item, or the wait completed successfully, return
# the front of the queue.
return queue.popleft()
# Wait for the next notification.
# Will return immediately if a notification has already been received.
async def notified(self, timeout_ms=None):
self._check(_FLAG_NOTIFY)
return await self._notified_indicated(self._notify_queue, self._notify_event, timeout_ms)
def _on_notify_indicate(self, queue, event, data):
# If we've gone from empty to one item, then wake something
# blocking on `await char.notified()` (or `await char.indicated()`).
wake = len(queue) == 0
# Append the data. By default this is a deque with max-length==1, so it
# replaces. But if capture is enabled then it will append.
queue.append(data)
if wake:
# Queue is now non-empty. If something is waiting, it will be
# worken. If something isn't waiting right now, then a future
# caller to `await char.written()` will see the queue is
# non-empty, and wait on the event if it's going to empty the
# queue.
event.set()
# Map an incoming notify IRQ to a registered characteristic.
def _on_notify(conn_handle, value_handle, notify_data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._on_notify_indicate(
characteristic._notify_queue, characteristic._notify_event, notify_data
)
# Wait for the next indication.
# Will return immediately if an indication has already been received.
async def indicated(self, timeout_ms=None):
self._check(_FLAG_INDICATE)
return await self._notified_indicated(
self._indicate_queue, self._indicate_event, timeout_ms
)
# Map an incoming indicate IRQ to a registered characteristic.
def _on_indicate(conn_handle, value_handle, indicate_data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._on_notify_indicate(
characteristic._indicate_queue, characteristic._indicate_event, indicate_data
)
# Write to the Client Characteristic Configuration to subscribe to
# notify/indications for this characteristic.
async def subscribe(self, notify=True, indicate=False):
# Ensure that the generated notifications are dispatched in case the app
# hasn't awaited on notified/indicated yet.
self._register_with_connection()
if cccd := await self.descriptor(bluetooth.UUID(_CCCD_UUID)):
await cccd.write(struct.pack("<H", _CCCD_NOTIFY * notify + _CCCD_INDICATE * indicate))
else:
raise ValueError("CCCD not found")
# Represents a single descriptor supported by a characteristic. Do not construct
# this class directly, instead use `async for descriptors in
# characteristic.descriptors([uuid])` or `await characteristic.descriptor(uuid)`.
class ClientDescriptor(BaseClientCharacteristic):
def __init__(self, characteristic, dsc_handle, uuid):
self.characteristic = characteristic
super().__init__(dsc_handle, _FLAG_READ | _FLAG_WRITE_NO_RESPONSE, uuid)
def __str__(self):
return "Descriptor: {} {} {}".format(self._value_handle, self.properties, self.uuid)
def _connection(self):
return self.characteristic.service.connection
# For ClientDiscover
def _start_discovery(characteristic, uuid=None):
ble.gattc_discover_descriptors(
characteristic._connection()._conn_handle,
characteristic._value_handle,
characteristic._end_handle,
)

78
robot/aioble/core.py Normal file
View File

@ -0,0 +1,78 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
import bluetooth
log_level = 1
def log_error(*args):
if log_level > 0:
print("[aioble] E:", *args)
def log_warn(*args):
if log_level > 1:
print("[aioble] W:", *args)
def log_info(*args):
if log_level > 2:
print("[aioble] I:", *args)
class GattError(Exception):
def __init__(self, status):
self._status = status
def ensure_active():
if not ble.active():
try:
from .security import load_secrets
load_secrets()
except:
pass
ble.active(True)
def config(*args, **kwargs):
ensure_active()
return ble.config(*args, **kwargs)
# Because different functionality is enabled by which files are available the
# different modules can register their IRQ handlers and shutdown handlers
# dynamically.
_irq_handlers = []
_shutdown_handlers = []
def register_irq_handler(irq, shutdown):
if irq:
_irq_handlers.append(irq)
if shutdown:
_shutdown_handlers.append(shutdown)
def stop():
ble.active(False)
for handler in _shutdown_handlers:
handler()
# Dispatch IRQs to the registered sub-modules.
def ble_irq(event, data):
log_info(event, data)
for handler in _irq_handlers:
result = handler(event, data)
if result is not None:
return result
# TODO: Allow this to be injected.
ble = bluetooth.BLE()
ble.irq(ble_irq)

295
robot/aioble/device.py Normal file
View File

@ -0,0 +1,295 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import uasyncio as asyncio
import binascii
from .core import ble, register_irq_handler, log_error
_IRQ_MTU_EXCHANGED = const(21)
# Raised by `with device.timeout()`.
class DeviceDisconnectedError(Exception):
pass
def _device_irq(event, data):
if event == _IRQ_MTU_EXCHANGED:
conn_handle, mtu = data
if device := DeviceConnection._connected.get(conn_handle, None):
device.mtu = mtu
if device._mtu_event:
device._mtu_event.set()
register_irq_handler(_device_irq, None)
# Context manager to allow an operation to be cancelled by timeout or device
# disconnection. Don't use this directly -- use `with connection.timeout(ms):`
# instead.
class DeviceTimeout:
def __init__(self, connection, timeout_ms):
self._connection = connection
self._timeout_ms = timeout_ms
# We allow either (or both) connection and timeout_ms to be None. This
# allows this to be used either as a just-disconnect, just-timeout, or
# no-op.
# This task is active while the operation is in progress. It sleeps
# until the timeout, and then cancels the working task. If the working
# task completes, __exit__ will cancel the sleep.
self._timeout_task = None
# This is the task waiting for the actual operation to complete.
# Usually this is waiting on an event that will be set() by an IRQ
# handler.
self._task = asyncio.current_task()
# Tell the connection that if it disconnects, it should cancel this
# operation (by cancelling self._task).
if connection:
connection._timeouts.append(self)
async def _timeout_sleep(self):
try:
await asyncio.sleep_ms(self._timeout_ms)
except asyncio.CancelledError:
# The operation completed successfully and this timeout task was
# cancelled by __exit__.
return
# The sleep completed, so we should trigger the timeout. Set
# self._timeout_task to None so that we can tell the difference
# between a disconnect and a timeout in __exit__.
self._timeout_task = None
self._task.cancel()
def __enter__(self):
if self._timeout_ms:
# Schedule the timeout waiter.
self._timeout_task = asyncio.create_task(self._timeout_sleep())
def __exit__(self, exc_type, exc_val, exc_traceback):
# One of five things happened:
# 1 - The operation completed successfully.
# 2 - The operation timed out.
# 3 - The device disconnected.
# 4 - The operation failed for a different exception.
# 5 - The task was cancelled by something else.
# Don't need the connection to tell us about disconnection anymore.
if self._connection:
self._connection._timeouts.remove(self)
try:
if exc_type == asyncio.CancelledError:
# Case 2, we started a timeout and it's completed.
if self._timeout_ms and self._timeout_task is None:
raise asyncio.TimeoutError
# Case 3, we have a disconnected device.
if self._connection and self._connection._conn_handle is None:
raise DeviceDisconnectedError
# Case 5, something else cancelled us.
# Allow the cancellation to propagate.
return
# Case 1 & 4. Either way, just stop the timeout task and let the
# exception (if case 4) propagate.
finally:
# In all cases, if the timeout is still running, cancel it.
if self._timeout_task:
self._timeout_task.cancel()
class Device:
def __init__(self, addr_type, addr):
# Public properties
self.addr_type = addr_type
self.addr = addr if len(addr) == 6 else binascii.unhexlify(addr.replace(":", ""))
self._connection = None
def __eq__(self, rhs):
return self.addr_type == rhs.addr_type and self.addr == rhs.addr
def __hash__(self):
return hash((self.addr_type, self.addr))
def __str__(self):
return "Device({}, {}{})".format(
"ADDR_PUBLIC" if self.addr_type == 0 else "ADDR_RANDOM",
self.addr_hex(),
", CONNECTED" if self._connection else "",
)
def addr_hex(self):
return binascii.hexlify(self.addr, ":").decode()
async def connect(self, timeout_ms=10000):
if self._connection:
return self._connection
# Forward to implementation in central.py.
from .central import _connect
await _connect(DeviceConnection(self), timeout_ms)
# Start the device task that will clean up after disconnection.
self._connection._run_task()
return self._connection
class DeviceConnection:
# Global map of connection handle to active devices (for IRQ mapping).
_connected = {}
def __init__(self, device):
self.device = device
device._connection = self
self.encrypted = False
self.authenticated = False
self.bonded = False
self.key_size = False
self.mtu = None
self._conn_handle = None
# This event is fired by the IRQ both for connection and disconnection
# and controls the device_task.
self._event = None
# If we're waiting for a pending MTU exchange.
self._mtu_event = None
# In-progress client discovery instance (e.g. services, chars,
# descriptors) used for IRQ mapping.
self._discover = None
# Map of value handle to characteristic (so that IRQs with
# conn_handle,value_handle can route to them). See
# ClientCharacteristic._find for where this is used.
self._characteristics = {}
self._task = None
# DeviceTimeout instances that are currently waiting on this device
# and need to be notified if disconnection occurs.
self._timeouts = []
# Fired by the encryption update event.
self._pair_event = None
# Active L2CAP channel for this device.
# TODO: Support more than one concurrent channel.
self._l2cap_channel = None
# While connected, this tasks waits for disconnection then cleans up.
async def device_task(self):
assert self._conn_handle is not None
# Wait for the (either central or peripheral) disconnected irq.
await self._event.wait()
# Mark the device as disconnected.
del DeviceConnection._connected[self._conn_handle]
self._conn_handle = None
self.device._connection = None
# Cancel any in-progress operations on this device.
for t in self._timeouts:
t._task.cancel()
def _run_task(self):
# Event will be already created this if we initiated connection.
self._event = self._event or asyncio.ThreadSafeFlag()
self._task = asyncio.create_task(self.device_task())
async def disconnect(self, timeout_ms=2000):
await self.disconnected(timeout_ms, disconnect=True)
async def disconnected(self, timeout_ms=60000, disconnect=False):
if not self.is_connected():
return
# The task must have been created after successful connection.
assert self._task
if disconnect:
try:
ble.gap_disconnect(self._conn_handle)
except OSError as e:
log_error("Disconnect", e)
with DeviceTimeout(None, timeout_ms):
await self._task
# Retrieve a single service matching this uuid.
async def service(self, uuid, timeout_ms=2000):
result = None
# Make sure loop runs to completion.
async for service in self.services(uuid, timeout_ms):
if not result and service.uuid == uuid:
result = service
return result
# Search for all services (optionally by uuid).
# Use with `async for`, e.g.
# async for service in device.services():
# Note: must allow the loop to run to completion.
# TODO: disconnection / timeout
def services(self, uuid=None, timeout_ms=2000):
from .client import ClientDiscover, ClientService
return ClientDiscover(self, ClientService, self, timeout_ms, uuid)
async def pair(self, *args, **kwargs):
from .security import pair
await pair(self, *args, **kwargs)
def is_connected(self):
return self._conn_handle is not None
# Use with `with` to simplify disconnection and timeout handling.
def timeout(self, timeout_ms):
return DeviceTimeout(self, timeout_ms)
async def exchange_mtu(self, mtu=None, timeout_ms=1000):
if not self.is_connected():
raise ValueError("Not connected")
if mtu:
ble.config(mtu=mtu)
self._mtu_event = self._mtu_event or asyncio.ThreadSafeFlag()
ble.gattc_exchange_mtu(self._conn_handle)
with self.timeout(timeout_ms):
await self._mtu_event.wait()
return self.mtu
# Wait for a connection on an L2CAP connection-oriented-channel.
async def l2cap_accept(self, psm, mtu, timeout_ms=None):
from .l2cap import accept
return await accept(self, psm, mtu, timeout_ms)
# Attempt to connect to a listening device.
async def l2cap_connect(self, psm, mtu, timeout_ms=1000):
from .l2cap import connect
return await connect(self, psm, mtu, timeout_ms)
# Context manager -- automatically disconnect.
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_traceback):
await self.disconnect()

214
robot/aioble/l2cap.py Normal file
View File

@ -0,0 +1,214 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import uasyncio as asyncio
from .core import ble, log_error, register_irq_handler
from .device import DeviceConnection
_IRQ_L2CAP_ACCEPT = const(22)
_IRQ_L2CAP_CONNECT = const(23)
_IRQ_L2CAP_DISCONNECT = const(24)
_IRQ_L2CAP_RECV = const(25)
_IRQ_L2CAP_SEND_READY = const(26)
# Once we start listening we're listening forever. (Limitation in NimBLE)
_listening = False
def _l2cap_irq(event, data):
if event not in (
_IRQ_L2CAP_CONNECT,
_IRQ_L2CAP_DISCONNECT,
_IRQ_L2CAP_RECV,
_IRQ_L2CAP_SEND_READY,
):
return
# All the L2CAP events start with (conn_handle, cid, ...)
if connection := DeviceConnection._connected.get(data[0], None):
if channel := connection._l2cap_channel:
# Expect to match the cid for this conn handle (unless we're
# waiting for connection in which case channel._cid is None).
if channel._cid is not None and channel._cid != data[1]:
return
# Update the channel object with new information.
if event == _IRQ_L2CAP_CONNECT:
_, channel._cid, _, channel.our_mtu, channel.peer_mtu = data
elif event == _IRQ_L2CAP_DISCONNECT:
_, _, psm, status = data
channel._status = status
channel._cid = None
connection._l2cap_channel = None
elif event == _IRQ_L2CAP_RECV:
channel._data_ready = True
elif event == _IRQ_L2CAP_SEND_READY:
channel._stalled = False
# Notify channel.
channel._event.set()
def _l2cap_shutdown():
global _listening
_listening = False
register_irq_handler(_l2cap_irq, _l2cap_shutdown)
# The channel was disconnected during a send/recvinto/flush.
class L2CAPDisconnectedError(Exception):
pass
# Failed to connect to connection (argument is status).
class L2CAPConnectionError(Exception):
pass
class L2CAPChannel:
def __init__(self, connection):
if not connection.is_connected():
raise ValueError("Not connected")
if connection._l2cap_channel:
raise ValueError("Already has channel")
connection._l2cap_channel = self
self._connection = connection
# Maximum size that the other side can send to us.
self.our_mtu = 0
# Maximum size that we can send.
self.peer_mtu = 0
# Set back to None on disconnection.
self._cid = None
# Set during disconnection.
self._status = 0
# If true, must wait for _IRQ_L2CAP_SEND_READY IRQ before sending.
self._stalled = False
# Has received a _IRQ_L2CAP_RECV since the buffer was last emptied.
self._data_ready = False
self._event = asyncio.ThreadSafeFlag()
def _assert_connected(self):
if self._cid is None:
raise L2CAPDisconnectedError
async def recvinto(self, buf, timeout_ms=None):
self._assert_connected()
# Wait until the data_ready flag is set. This flag is only ever set by
# the event and cleared by this function.
with self._connection.timeout(timeout_ms):
while not self._data_ready:
await self._event.wait()
self._assert_connected()
self._assert_connected()
# Extract up to len(buf) bytes from the channel buffer.
n = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, buf)
# Check if there's still remaining data in the channel buffers.
self._data_ready = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, None) > 0
return n
# Synchronously see if there's data ready.
def available(self):
self._assert_connected()
return self._data_ready
# Waits until the channel is free and then sends buf.
# If the buffer is larger than the MTU it will be sent in chunks.
async def send(self, buf, timeout_ms=None, chunk_size=None):
self._assert_connected()
offset = 0
chunk_size = min(self.our_mtu * 2, self.peer_mtu, chunk_size or self.peer_mtu)
mv = memoryview(buf)
while offset < len(buf):
if self._stalled:
await self.flush(timeout_ms)
# l2cap_send returns True if you can send immediately.
self._stalled = not ble.l2cap_send(
self._connection._conn_handle,
self._cid,
mv[offset : offset + chunk_size],
)
offset += chunk_size
async def flush(self, timeout_ms=None):
self._assert_connected()
# Wait for the _stalled flag to be cleared by the IRQ.
with self._connection.timeout(timeout_ms):
while self._stalled:
await self._event.wait()
self._assert_connected()
async def disconnect(self, timeout_ms=1000):
if self._cid is None:
return
# Wait for the cid to be cleared by the disconnect IRQ.
ble.l2cap_disconnect(self._connection._conn_handle, self._cid)
await self.disconnected(timeout_ms)
async def disconnected(self, timeout_ms=1000):
with self._connection.timeout(timeout_ms):
while self._cid is not None:
await self._event.wait()
# Context manager -- automatically disconnect.
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_traceback):
await self.disconnect()
# Use connection.l2cap_accept() instead of calling this directly.
async def accept(connection, psm, mtu, timeout_ms):
global _listening
channel = L2CAPChannel(connection)
# Start the stack listening if necessary.
if not _listening:
ble.l2cap_listen(psm, mtu)
_listening = True
# Wait for the connect irq from the remote connection.
with connection.timeout(timeout_ms):
await channel._event.wait()
return channel
# Use connection.l2cap_connect() instead of calling this directly.
async def connect(connection, psm, mtu, timeout_ms):
if _listening:
raise ValueError("Can't connect while listening")
channel = L2CAPChannel(connection)
with connection.timeout(timeout_ms):
ble.l2cap_connect(connection._conn_handle, psm, mtu)
# Wait for the connect irq from the remote connection.
# If the connection fails, we get a disconnect event (with status) instead.
await channel._event.wait()
if channel._cid is not None:
return channel
else:
raise L2CAPConnectionError(channel._status)

179
robot/aioble/peripheral.py Normal file
View File

@ -0,0 +1,179 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
import bluetooth
import struct
import uasyncio as asyncio
from .core import (
ensure_active,
ble,
log_info,
log_error,
log_warn,
register_irq_handler,
)
from .device import Device, DeviceConnection, DeviceTimeout
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
_ADV_TYPE_MANUFACTURER = const(0xFF)
_ADV_PAYLOAD_MAX_LEN = const(31)
_incoming_connection = None
_connect_event = None
def _peripheral_irq(event, data):
global _incoming_connection
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, addr_type, addr = data
# Create, initialise, and register the device.
device = Device(addr_type, bytes(addr))
_incoming_connection = DeviceConnection(device)
_incoming_connection._conn_handle = conn_handle
DeviceConnection._connected[conn_handle] = _incoming_connection
# Signal advertise() to return the connected device.
_connect_event.set()
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
if connection := DeviceConnection._connected.get(conn_handle, None):
# Tell the device_task that it should terminate.
connection._event.set()
def _peripheral_shutdown():
global _incoming_connection, _connect_event
_incoming_connection = None
_connect_event = None
register_irq_handler(_peripheral_irq, _peripheral_shutdown)
# Advertising payloads are repeated packets of the following form:
# 1 byte data length (N + 1)
# 1 byte type (see constants below)
# N bytes type-specific data
def _append(adv_data, resp_data, adv_type, value):
data = struct.pack("BB", len(value) + 1, adv_type) + value
if len(data) + len(adv_data) < _ADV_PAYLOAD_MAX_LEN:
adv_data += data
return resp_data
if len(data) + (len(resp_data) if resp_data else 0) < _ADV_PAYLOAD_MAX_LEN:
if not resp_data:
# Overflow into resp_data for the first time.
resp_data = bytearray()
resp_data += data
return resp_data
raise ValueError("Advertising payload too long")
async def advertise(
interval_us,
adv_data=None,
resp_data=None,
connectable=True,
limited_disc=False,
br_edr=False,
name=None,
services=None,
appearance=0,
manufacturer=None,
timeout_ms=None,
):
global _incoming_connection, _connect_event
ensure_active()
if not adv_data and not resp_data:
# If the user didn't manually specify adv_data / resp_data then
# construct them from the kwargs. Keep adding fields to adv_data,
# overflowing to resp_data if necessary.
# TODO: Try and do better bin-packing than just concatenating in
# order?
adv_data = bytearray()
resp_data = _append(
adv_data,
resp_data,
_ADV_TYPE_FLAGS,
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)),
)
# Services are prioritised to go in the advertising data because iOS supports
# filtering scan results by service only, so services must come first.
if services:
for uuid in services:
b = bytes(uuid)
if len(b) == 2:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID16_COMPLETE, b)
elif len(b) == 4:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID32_COMPLETE, b)
elif len(b) == 16:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID128_COMPLETE, b)
if name:
resp_data = _append(adv_data, resp_data, _ADV_TYPE_NAME, name)
if appearance:
# See org.bluetooth.characteristic.gap.appearance.xml
resp_data = _append(
adv_data, resp_data, _ADV_TYPE_APPEARANCE, struct.pack("<H", appearance)
)
if manufacturer:
resp_data = _append(
adv_data,
resp_data,
_ADV_TYPE_MANUFACTURER,
struct.pack("<H", manufacturer[0]) + manufacturer[1],
)
_connect_event = _connect_event or asyncio.ThreadSafeFlag()
ble.gap_advertise(interval_us, adv_data=adv_data, resp_data=resp_data, connectable=connectable)
try:
# Allow optional timeout for a central to connect to us (or just to stop advertising).
with DeviceTimeout(None, timeout_ms):
await _connect_event.wait()
# Get the newly connected connection to the central and start a task
# to wait for disconnection.
result = _incoming_connection
_incoming_connection = None
# This mirrors what connecting to a central does.
result._run_task()
return result
except asyncio.CancelledError:
# Something else cancelled this task (to manually stop advertising).
ble.gap_advertise(None)
except asyncio.TimeoutError:
# DeviceTimeout waiting for connection.
ble.gap_advertise(None)
raise

178
robot/aioble/security.py Normal file
View File

@ -0,0 +1,178 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const, schedule
import uasyncio as asyncio
import binascii
import json
from .core import log_info, log_warn, ble, register_irq_handler
from .device import DeviceConnection
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_GET_SECRET = const(29)
_IRQ_SET_SECRET = const(30)
_IRQ_PASSKEY_ACTION = const(31)
_IO_CAPABILITY_DISPLAY_ONLY = const(0)
_IO_CAPABILITY_DISPLAY_YESNO = const(1)
_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
_PASSKEY_ACTION_INPUT = const(2)
_PASSKEY_ACTION_DISP = const(3)
_PASSKEY_ACTION_NUMCMP = const(4)
_DEFAULT_PATH = "ble_secrets.json"
_secrets = {}
_modified = False
_path = None
# Must call this before stack startup.
def load_secrets(path=None):
global _path, _secrets
# Use path if specified, otherwise use previous path, otherwise use
# default path.
_path = path or _path or _DEFAULT_PATH
# Reset old secrets.
_secrets = {}
try:
with open(_path, "r") as f:
entries = json.load(f)
for sec_type, key, value in entries:
# Decode bytes from hex.
_secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
except:
log_warn("No secrets available")
# Call this whenever the secrets dict changes.
def _save_secrets(arg=None):
global _modified, _path
_path = _path or _DEFAULT_PATH
if not _modified:
# Only save if the secrets changed.
return
with open(_path, "w") as f:
# Convert bytes to hex strings (otherwise JSON will treat them like
# strings).
json_secrets = [
(sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
for (sec_type, key), value in _secrets.items()
]
json.dump(json_secrets, f)
_modified = False
def _security_irq(event, data):
global _modified
if event == _IRQ_ENCRYPTION_UPDATE:
# Connection has updated (usually due to pairing).
conn_handle, encrypted, authenticated, bonded, key_size = data
log_info("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
if connection := DeviceConnection._connected.get(conn_handle, None):
connection.encrypted = encrypted
connection.authenticated = authenticated
connection.bonded = bonded
connection.key_size = key_size
# TODO: Handle failure.
if encrypted and connection._pair_event:
connection._pair_event.set()
elif event == _IRQ_SET_SECRET:
sec_type, key, value = data
key = sec_type, bytes(key)
value = bytes(value) if value else None
log_info("set secret:", key, value)
if value is None:
# Delete secret.
if key not in _secrets:
return False
del _secrets[key]
else:
# Save secret.
_secrets[key] = value
# Queue up a save (don't synchronously write to flash).
_modified = True
schedule(_save_secrets, None)
return True
elif event == _IRQ_GET_SECRET:
sec_type, index, key = data
log_info("get secret:", sec_type, index, bytes(key) if key else None)
if key is None:
# Return the index'th secret of this type.
i = 0
for (t, _key), value in _secrets.items():
if t == sec_type:
if i == index:
return value
i += 1
return None
else:
# Return the secret for this key (or None).
key = sec_type, bytes(key)
return _secrets.get(key, None)
elif event == _IRQ_PASSKEY_ACTION:
conn_handle, action, passkey = data
log_info("passkey action", conn_handle, action, passkey)
# if action == _PASSKEY_ACTION_NUMCMP:
# # TODO: Show this passkey and confirm accept/reject.
# accept = 1
# self._ble.gap_passkey(conn_handle, action, accept)
# elif action == _PASSKEY_ACTION_DISP:
# # TODO: Generate and display a passkey so the remote device can enter it.
# passkey = 123456
# self._ble.gap_passkey(conn_handle, action, passkey)
# elif action == _PASSKEY_ACTION_INPUT:
# # TODO: Ask the user to enter the passkey shown on the remote device.
# passkey = 123456
# self._ble.gap_passkey(conn_handle, action, passkey)
# else:
# log_warn("unknown passkey action")
def _security_shutdown():
global _secrets, _modified, _path
_secrets = {}
_modified = False
_path = None
register_irq_handler(_security_irq, _security_shutdown)
# Use device.pair() rather than calling this directly.
async def pair(
connection,
bond=True,
le_secure=True,
mitm=False,
io=_IO_CAPABILITY_NO_INPUT_OUTPUT,
timeout_ms=20000,
):
ble.config(bond=bond, le_secure=le_secure, mitm=mitm, io=io)
with connection.timeout(timeout_ms):
connection._pair_event = asyncio.ThreadSafeFlag()
ble.gap_pair(connection._conn_handle)
await connection._pair_event.wait()
# TODO: Allow the passkey action to return to here and
# invoke a callback or task to process the action.

344
robot/aioble/server.py Normal file
View File

@ -0,0 +1,344 @@
# MicroPython aioble module
# MIT license; Copyright (c) 2021 Jim Mussared
from micropython import const
from collections import deque
import bluetooth
import uasyncio as asyncio
from .core import (
ensure_active,
ble,
log_info,
log_error,
log_warn,
register_irq_handler,
GattError,
)
from .device import DeviceConnection, DeviceTimeout
_registered_characteristics = {}
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_GATTS_INDICATE_DONE = const(20)
_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
_FLAG_READ_ENCRYPTED = const(0x0200)
_FLAG_READ_AUTHENTICATED = const(0x0400)
_FLAG_READ_AUTHORIZED = const(0x0800)
_FLAG_WRITE_ENCRYPTED = const(0x1000)
_FLAG_WRITE_AUTHENTICATED = const(0x2000)
_FLAG_WRITE_AUTHORIZED = const(0x4000)
_FLAG_WRITE_CAPTURE = const(0x10000)
_FLAG_DESC_READ = const(1)
_FLAG_DESC_WRITE = const(2)
_WRITE_CAPTURE_QUEUE_LIMIT = const(10)
def _server_irq(event, data):
if event == _IRQ_GATTS_WRITE:
conn_handle, attr_handle = data
Characteristic._remote_write(conn_handle, attr_handle)
elif event == _IRQ_GATTS_READ_REQUEST:
conn_handle, attr_handle = data
return Characteristic._remote_read(conn_handle, attr_handle)
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
Characteristic._indicate_done(conn_handle, value_handle, status)
def _server_shutdown():
global _registered_characteristics
_registered_characteristics = {}
if hasattr(BaseCharacteristic, "_capture_task"):
BaseCharacteristic._capture_task.cancel()
del BaseCharacteristic._capture_queue
del BaseCharacteristic._capture_write_event
del BaseCharacteristic._capture_consumed_event
del BaseCharacteristic._capture_task
register_irq_handler(_server_irq, _server_shutdown)
class Service:
def __init__(self, uuid):
self.uuid = uuid
self.characteristics = []
# Generate tuple for gatts_register_services.
def _tuple(self):
return (self.uuid, tuple(c._tuple() for c in self.characteristics))
class BaseCharacteristic:
def _register(self, value_handle):
self._value_handle = value_handle
_registered_characteristics[value_handle] = self
if self._initial is not None:
self.write(self._initial)
self._initial = None
# Read value from local db.
def read(self):
if self._value_handle is None:
return self._initial or b""
else:
return ble.gatts_read(self._value_handle)
# Write value to local db, and optionally notify/indicate subscribers.
def write(self, data, send_update=False):
if self._value_handle is None:
self._initial = data
else:
ble.gatts_write(self._value_handle, data, send_update)
# When the a capture-enabled characteristic is created, create the
# necessary events (if not already created).
@staticmethod
def _init_capture():
if hasattr(BaseCharacteristic, "_capture_queue"):
return
BaseCharacteristic._capture_queue = deque((), _WRITE_CAPTURE_QUEUE_LIMIT)
BaseCharacteristic._capture_write_event = asyncio.ThreadSafeFlag()
BaseCharacteristic._capture_consumed_event = asyncio.ThreadSafeFlag()
BaseCharacteristic._capture_task = asyncio.create_task(
BaseCharacteristic._run_capture_task()
)
# Monitor the shared queue for incoming characteristic writes and forward
# them sequentially to the individual characteristic events.
@staticmethod
async def _run_capture_task():
write = BaseCharacteristic._capture_write_event
consumed = BaseCharacteristic._capture_consumed_event
q = BaseCharacteristic._capture_queue
while True:
if len(q):
conn, data, characteristic = q.popleft()
# Let the characteristic waiting in `written()` know that it
# can proceed.
characteristic._write_data = (conn, data)
characteristic._write_event.set()
# Wait for the characteristic to complete `written()` before
# continuing.
await consumed.wait()
if not len(q):
await write.wait()
# Wait for a write on this characteristic. Returns the connection that did
# the write, or a tuple of (connection, value) if capture is enabled for
# this characteristics.
async def written(self, timeout_ms=None):
if not hasattr(self, "_write_event"):
# Not a writable characteristic.
return
# If no write has been seen then we need to wait. If the event has
# already been set this will clear the event and continue
# immediately. In regular mode, this is set by the write IRQ
# directly (in _remote_write). In capture mode, this is set when it's
# our turn by _capture_task.
with DeviceTimeout(None, timeout_ms):
await self._write_event.wait()
# Return the write data and clear the stored copy.
# In default usage this will be just the connection handle.
# In capture mode this will be a tuple of (connection_handle, received_data)
data = self._write_data
self._write_data = None
if self.flags & _FLAG_WRITE_CAPTURE:
# Notify the shared queue monitor that the event has been consumed
# by the caller to `written()` and another characteristic can now
# proceed.
BaseCharacteristic._capture_consumed_event.set()
return data
def on_read(self, connection):
return 0
def _remote_write(conn_handle, value_handle):
if characteristic := _registered_characteristics.get(value_handle, None):
# If we've gone from empty to one item, then wake something
# blocking on `await char.written()`.
conn = DeviceConnection._connected.get(conn_handle, None)
if characteristic.flags & _FLAG_WRITE_CAPTURE:
# For capture, we append the connection and the written value
# value to the shared queue along with the matching characteristic object.
# The deque will enforce the max queue len.
data = characteristic.read()
BaseCharacteristic._capture_queue.append((conn, data, characteristic))
BaseCharacteristic._capture_write_event.set()
else:
# Store the write connection handle to be later used to retrieve the data
# then set event to handle in written() task.
characteristic._write_data = conn
characteristic._write_event.set()
def _remote_read(conn_handle, value_handle):
if characteristic := _registered_characteristics.get(value_handle, None):
return characteristic.on_read(DeviceConnection._connected.get(conn_handle, None))
class Characteristic(BaseCharacteristic):
def __init__(
self,
service,
uuid,
read=False,
write=False,
write_no_response=False,
notify=False,
indicate=False,
initial=None,
capture=False,
):
service.characteristics.append(self)
self.descriptors = []
flags = 0
if read:
flags |= _FLAG_READ
if write or write_no_response:
flags |= (_FLAG_WRITE if write else 0) | (
_FLAG_WRITE_NO_RESPONSE if write_no_response else 0
)
if capture:
# Capture means that we keep track of all writes, and capture
# their values (and connection) in a queue. Otherwise we just
# track the connection of the most recent write.
flags |= _FLAG_WRITE_CAPTURE
BaseCharacteristic._init_capture()
# Set when this characteristic has a value waiting in self._write_data.
self._write_event = asyncio.ThreadSafeFlag()
# The connection of the most recent write, or a tuple of
# (connection, data) if capture is enabled.
self._write_data = None
if notify:
flags |= _FLAG_NOTIFY
if indicate:
flags |= _FLAG_INDICATE
# TODO: This should probably be a dict of connection to (ev, status).
# Right now we just support a single indication at a time.
self._indicate_connection = None
self._indicate_event = asyncio.ThreadSafeFlag()
self._indicate_status = None
self.uuid = uuid
self.flags = flags
self._value_handle = None
self._initial = initial
# Generate tuple for gatts_register_services.
def _tuple(self):
if self.descriptors:
return (self.uuid, self.flags, tuple(d._tuple() for d in self.descriptors))
else:
# Workaround: v1.19 and below can't handle an empty descriptor tuple.
return (self.uuid, self.flags)
def notify(self, connection, data=None):
if not (self.flags & _FLAG_NOTIFY):
raise ValueError("Not supported")
ble.gatts_notify(connection._conn_handle, self._value_handle, data)
async def indicate(self, connection, timeout_ms=1000):
if not (self.flags & _FLAG_INDICATE):
raise ValueError("Not supported")
if self._indicate_connection is not None:
raise ValueError("In progress")
if not connection.is_connected():
raise ValueError("Not connected")
self._indicate_connection = connection
self._indicate_status = None
try:
with connection.timeout(timeout_ms):
ble.gatts_indicate(connection._conn_handle, self._value_handle)
await self._indicate_event.wait()
if self._indicate_status != 0:
raise GattError(self._indicate_status)
finally:
self._indicate_connection = None
def _indicate_done(conn_handle, value_handle, status):
if characteristic := _registered_characteristics.get(value_handle, None):
if connection := DeviceConnection._connected.get(conn_handle, None):
if not characteristic._indicate_connection:
# Timeout.
return
# See TODO in __init__ to support multiple concurrent indications.
assert connection == characteristic._indicate_connection
characteristic._indicate_status = status
characteristic._indicate_event.set()
class BufferedCharacteristic(Characteristic):
def __init__(self, service, uuid, max_len=20, append=False):
super().__init__(service, uuid, read=True)
self._max_len = max_len
self._append = append
def _register(self, value_handle):
super()._register(value_handle)
ble.gatts_set_buffer(value_handle, self._max_len, self._append)
class Descriptor(BaseCharacteristic):
def __init__(self, characteristic, uuid, read=False, write=False, initial=None):
characteristic.descriptors.append(self)
# Workaround for https://github.com/micropython/micropython/issues/6864
flags = 0
if read:
flags |= _FLAG_DESC_READ
if write:
self._write_event = asyncio.ThreadSafeFlag()
self._write_data = None
flags |= _FLAG_DESC_WRITE
self.uuid = uuid
self.flags = flags
self._value_handle = None
self._initial = initial
# Generate tuple for gatts_register_services.
def _tuple(self):
return (self.uuid, self.flags)
# Turn the Service/Characteristic/Descriptor classes into a registration tuple
# and then extract their value handles.
def register_services(*services):
ensure_active()
_registered_characteristics.clear()
handles = ble.gatts_register_services(tuple(s._tuple() for s in services))
for i in range(len(services)):
service_handles = handles[i]
service = services[i]
n = 0
for characteristic in service.characteristics:
characteristic._register(service_handles[n])
n += 1
for descriptor in characteristic.descriptors:
descriptor._register(service_handles[n])
n += 1

30
robot/buzzer.py Normal file
View File

@ -0,0 +1,30 @@
import sys
import utime
class Buzzer:
def __init__(self):
pass
# ------------------------------------
# Vittascience
# Example for playing sound
# ------------------------------------
def _pitch(self, robot, noteFrequency, noteDuration, silence_ms = 10):
if noteFrequency is not 0:
microsecondsPerWave = 1e6 / noteFrequency
millisecondsPerCycle = 1000 / (microsecondsPerWave * 2)
loopTime = noteDuration * millisecondsPerCycle
for x in range(loopTime):
# Buzzer high: 0
robot.controlBuzzer(0)
utime.sleep_us(int(microsecondsPerWave))
# buzzer low: 1
robot.controlBuzzer(1)
utime.sleep_us(int(microsecondsPerWave))
else:
utime.sleep_ms(int(noteDuration))
utime.sleep_ms(silence_ms)
def pitch(self, robot, noteFrequency, noteDuration, silence_ms = 10):
#print("[DEBUG][pitch]: Frequency {:5} Hz, Duration {:4} ms, silence {:4} ms".format(noteFrequency, noteDuration, silence_ms))
self._pitch(robot, noteFrequency, noteDuration, silence_ms)

497
robot/main.py Normal file
View File

@ -0,0 +1,497 @@
import machine
import utime, sys
import json
from stm32_alphabot_v2 import AlphaBot_v2
import gc
from stm32_ssd1306 import SSD1306, SSD1306_I2C
from stm32_vl53l0x import VL53L0X
from stm32_nec import NEC_8, NEC_16
import neopixel
import _thread
import os
#import bluetooth
#from stm32_ble_uart import BLEUART
import buzzer
# variable:
alphabot = oled = vl53l0x = None
ir_current_remote_code = None
dict_base=dict([('C-',32.70),('C#',34.65),('D-',36.71),('D#',38.89),('E-',41.20),('E#',43.65),('F-',43.65),('F#',46.35),('G-',49.00),('G#',51.91),('A-',55.00),('A#',58.27),('B-',61.74),('S-',0)])
# -------------------------------
# neopixel
# -------------------------------
class FoursNeoPixel():
def __init__(self, pin_number):
self._pin = pin_number
self._max_leds = 4
self._leds = neopixel.NeoPixel(self._pin, 4)
def set_led(self, addr, red, green, blue):
if addr >= 0 and addr < self._max_leds:
# coded on BGR
self._leds[addr] = (blue, green, red)
def set_led2(self, addr, rgb):
if addr >= 0 and addr < self._max_leds:
# coded on BGR
self._leds[addr] = rgb
def show(self):
self._leds.write()
def clear(self):
for i in range (0, self._max_leds):
self.set_led(i, 0,0,0)
self.show()
def neo_french_flag_threaded(leds):
while True:
leds.set_led(0, 250, 0, 0)
leds.set_led(1, 250, 0, 0)
leds.set_led(2, 250, 0, 0)
leds.set_led(3, 250, 0, 0)
leds.show()
utime.sleep(1)
leds.set_led(0, 250, 250, 250)
leds.set_led(1, 250, 250, 250)
leds.set_led(2, 250, 250, 250)
leds.set_led(3, 250, 250, 250)
leds.show()
utime.sleep(1)
leds.set_led(0, 0, 0, 250)
leds.set_led(1, 0, 0, 250)
leds.set_led(2, 0, 0, 250)
leds.set_led(3, 0, 0, 250)
leds.show()
utime.sleep(1)
leds.clear()
utime.sleep(2)
def neo_french_flag(fours_rgb_leds):
_thread.start_new_thread(neo_french_flag_threaded, ([fours_rgb_leds]))
# ----------------------------
# Remote Control
# ----------------------------
# Remote control Correlation table
# |-----------------| |----------------------|
# | | | | | | | |
# | CH- | CH | CH+ | | Vol- | Play | Vol+ |
# | | | | | | Pause | |
# | | | | | | | |
# |-----------------| |----------------------|
# | | | | | | | |
# | |<< | >>| | >|| | | Setup | Up | Stop |
# | | | | | | | Mode |
# | | | | | | | |
# |-----------------| |----------------------|
# | | | | | | | |
# | - | + | EQ | | Left | Enter | Right|
# | | | | | | Save | |
# | | | | | | | |
# |-----------------| |----------------------|
# | | | | | | | |
# | 0 |100+ | 200+| <==> | 0 | Down | Back |
# | | | | | | | |
# |-----------------| |----------------------|
# | | | | | | | |
# | 1 | 2 | 3 | | 1 | 2 | 3 |
# | | | | | | | |
# |-----------------| |----------------------|
# | | | | | | | |
# | 4 | 5 | 6 | | 4 | 5 | 6 |
# | | | | | | | |
# |-----------------| |----------------------|
# | | | | | | | |
# | 7 | 8 | 9 | | 7 | 8 | 9 |
# | | | | | | | |
# |-----------------| |----------------------|
#
def remoteNEC_basicBlack_getButton(hexCode):
if hexCode == 0x0c: return "1"
elif hexCode == 0x18: return "2"
elif hexCode == 0x5e: return "3"
elif hexCode == 0x08: return "4"
elif hexCode == 0x1c: return "5"
elif hexCode == 0x5a: return "6"
elif hexCode == 0x42: return "7"
elif hexCode == 0x52: return "8"
elif hexCode == 0x4a: return "9"
elif hexCode == 0x16: return "0"
elif hexCode == 0x40: return "up"
elif hexCode == 0x19: return "down"
elif hexCode == 0x07: return "left"
elif hexCode == 0x09: return "right"
elif hexCode == 0x15: return "enter_save"
elif hexCode == 0x0d: return "back"
elif hexCode == 0x45: return "volMinus"
elif hexCode == 0x47: return "volPlus"
elif hexCode == 0x46: return "play_pause"
elif hexCode == 0x44: return "setup"
elif hexCode == 0x43: return "stop_mode"
else: return "NEC remote code error"
def remoteNEC_callback(data, addr, ctrl):
global ir_current_remote_code
print("coucou")
if data < 0: # NEC protocol sends repeat codes.
print('Repeat code.')
else:
#print('Data {:02x} Addr {:04x} Ctrl {:02x}'.format(data, addr, ctrl))
ir_current_remote_code = remoteNEC_basicBlack_getButton(data)
print('Data {:02x} Addr {:04x} Ctrl {:02x} {}'.format(data, addr, ctrl, ir_current_remote_code))
# ----------------------------
# play music
# ----------------------------
def music_play():
d= [['C-3', 4 ], ['D-3', 4 ], ['E-3', 4 ], ['F-3', 4 ], ['G-3', 4 ] , ['A-3', 4 ], ['B-3', 4 ]]
freq_list = [ dict_base[d[i][0][:2] ] * 2**(int(d[i][0][2]) - 1 ) for i in range(0, len(d), 1 ) ]
duration_list= [int(d[i][1]) * 125 for i in range(0,len(d), 1)]
buz = buzzer.Buzzer()
for i in range(len(freq_list)):
buz.pitch(alphabot, freq_list[i], duration_list[i], 50)
# ----------------------------
# Follow line
# ----------------------------
_BLACKLIMIT = 650
DISPLAY_LINE_TRACKING_INFO = 1
def _motor_left_right(ml, mr):
alphabot.setMotors(left=ml, right=mr)
if DISPLAY_LINE_TRACKING_INFO:
oled.text('L {}'.format(ml),64, 0)
oled.text('R {}'.format(mr),64, 16)
def show_motor_left_right(ml, mr):
if DISPLAY_LINE_TRACKING_INFO:
oled.text('L {}'.format(ml),64, 0)
oled.text('R {}'.format(mr),64, 16)
def isSensorAboveLine(robot, sensorName, blackLimit = 300):
sensorsValue = robot.TRSensors_readLine(sensor=0) # all sensors values
if 'IR' in sensorName:
if sensorName=='IR1' and sensorsValue[0] < blackLimit: return True
elif sensorName=='IR2' and sensorsValue[1] < blackLimit: return True
elif sensorName=='IR3' and sensorsValue[2] < blackLimit: return True
elif sensorName=='IR4' and sensorsValue[3] < blackLimit: return True
elif sensorName=='IR5' and sensorsValue[4] < blackLimit: return True
else: return False
else:
raise ValueError("name '" + sensorName + "' is not a sensor option")
SPEED_MOTOR=13
def line_follower(limit=_BLACKLIMIT):
oled.fill(0)
if alphabot.readUltrasonicDistance() <= 5:
alphabot.stop()
#music_play()
else:
if not isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
oled.fill(0)
oled.show()
oled.text('En arriere', 0, 0)
oled.show()
alphabot.moveBackward(SPEED_MOTOR)
if not isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
oled.fill(0)
oled.show()
oled.text('A Gauche', 0, 0)
oled.show()
alphabot.setMotorLeft(SPEED_MOTOR)
alphabot.setMotorRight(0)
if not isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
oled.fill(0)
oled.show()
oled.text('Tout Droit', 0, 0)
oled.show()
alphabot.setMotorLeft(SPEED_MOTOR)
alphabot.setMotorRight(SPEED_MOTOR)
if not isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
alphabot.setMotorLeft(SPEED_MOTOR)
alphabot.setMotorRight(5)
if isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
oled.fill(0)
oled.show()
oled.text('A droite', 0, 0)
oled.show()
alphabot.setMotorLeft(0)
alphabot.setMotorRight(SPEED_MOTOR)
if isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
alphabot.moveBackward(SPEED_MOTOR)
if isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and not isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
alphabot.setMotorLeft(5)
alphabot.setMotorLeft(SPEED_MOTOR)
if isSensorAboveLine(alphabot, 'IR2', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR3', blackLimit=limit) and isSensorAboveLine(alphabot, 'IR4', blackLimit=limit):
alphabot.moveBackward(SPEED_MOTOR)
def line_follower_simple(limit=_BLACKLIMIT):
oled.fill(0)
if alphabot.readUltrasonicDistance() <= 5:
alphabot.stop()
oled.text('Obstacle', 4*8, 0)
oled.text('detected', 4*8, 16)
oled.text('STOPPED!', 4*8, 32)
#music_play()
else:
# get the light detection measurement on one time
line_detection = alphabot.TRSensors_readLine()
if DISPLAY_LINE_TRACKING_INFO:
print("readline:", line_detection)
if DISPLAY_LINE_TRACKING_INFO: oled.text('{:.02f}'.format(line_detection[1]), 0, 0)
if DISPLAY_LINE_TRACKING_INFO: oled.text('{:.02f}'.format(line_detection[2]), 0, 16)
if DISPLAY_LINE_TRACKING_INFO: oled.text('{:.02f}'.format(line_detection[3]), 0, 32)
if line_detection[2] < limit:
# we are on the line
alphabot.setMotors(right=22, left=22)
show_motor_left_right(22, 22)
elif line_detection[1] < limit:
#alphabot.turnLeft(65, 25)
alphabot.turnLeft(65, duration_ms=50)
show_motor_left_right(65, 0)
elif line_detection[3] < limit:
#alphabot.turnRight(65, 25)
alphabot.turnRight(65, duration_ms=50)
show_motor_left_right(0, 65)
elif line_detection[2] > limit:
alphabot.moveBackward(35, duration_ms=50)
show_motor_left_right(-35, -35)
# if (line_detection[1] < limit):
# alphabot.turnLeft(65, 25)
# show_motor_left_right(65, 0)
# elif line_detection[3] < limit:
# alphabot.turnRight(65, 25)
# show_motor_left_right(0, 65)
# elif line_detection[2] > limit:
# alphabot.setMotors(right=22, left=22)
# show_motor_left_right(22, 22)
# elif line_detection[2] < limit:
# alphabot.moveBackward(35, duration_ms=50)
# show_motor_left_right(-35, -35)
if vl53l0x is not None:
oled.text('tof {:4.0f}mm'.format(vl53l0x.getRangeMillimeters()), 0, 48)
oled.show()
# ------------------------------------------------
last_proportional = 0
integral = 0
maximum = 100
derivative = 0
# Algo from: https://www.waveshare.com/w/upload/7/74/AlphaBot2.tar.gz
def line_follower2():
oled.fill(0)
if alphabot.readUltrasonicDistance() <= 5:
print("Obstacle!!!!!")
alphabot.stop()
#music_play()
else:
global last_proportional
global integral
global derivative
# get the light detection measurement on one time
position,sensors_line = alphabot.TRSensors_position_readLine()
if (sensors_line[0] > 900 and sensors_line[1] > 900 and sensors_line[2] > 900 and sensors_line[3] > 900 and sensors_line[4] > 900):
_motor_left_right(0, 0)
return
if DISPLAY_LINE_TRACKING_INFO:
print("readline:", position, sensors_line)
oled.text('{:.02f}'.format(sensors_line[1]), 0, 0)
oled.text('{:.02f}'.format(sensors_line[2]), 0, 16)
oled.text('{:.02f}'.format(sensors_line[3]), 0, 32)
# The "proportional" term should be 0 when we are on the line.
proportional = position - 2000
# Compute the derivative (change) and integral (sum) of the position.
derivative = proportional - last_proportional
integral += proportional
# Remember the last position.
last_proportional = proportional
'''
// Compute the difference between the two motor power settings,
// m1 - m2. If this is a positive number the robot will turn
// to the right. If it is a negative number, the robot will
// turn to the left, and the magnitude of the number determines
// the sharpness of the turn. You can adjust the constants by which
// the proportional, integral, and derivative terms are multiplied to
// improve performance.
'''
power_difference = proportional/30 + integral/10000 + derivative*2
if (power_difference > maximum):
power_difference = maximum
if (power_difference < - maximum):
power_difference = - maximum
print("Line follower: ", position, power_difference)
if (power_difference < 0):
_motor_left_right(maximum + power_difference, maximum)
else:
_motor_left_right(maximum, maximum - power_difference)
utime.sleep_ms(100)
alphabot.stop()
oled.show()
# ----------------------------
# Motor move
# ----------------------------
def move_right(t=30):
alphabot.turnRight(20, t)
def move_left(t=30):
alphabot.turnLeft(20, t)
def move_forward(t=200):
if alphabot.readUltrasonicDistance() > 10:
alphabot.moveForward(20)
utime.sleep_ms(t)
alphabot.stop()
else:
alphabot.stop()
def move_backward(t=200):
alphabot.moveBackward(20)
utime.sleep_ms(t)
alphabot.stop()
def move_circumvention():
move_left(450)
move_forward(400)
move_right(450)
move_forward(400)
move_right(450)
move_forward(400)
move_left(450)
# ----------------------------
# BLE UART
# ----------------------------
# m: move
# b: move back
# l: left
# r: right
# s: stop
# M: music
# q: quit
def bluetooth_serial_processing(ble_uart):
while True:
utime.sleep_ms(200)
if ble_uart.any():
bluetoothData = ble_uart.read().decode().strip()
print(str(bluetoothData));
if 'r'.find(bluetoothData) + 1 == 1:
move_right()
elif 'l'.find(bluetoothData) + 1 == 1:
move_left()
elif 'm'.find(bluetoothData) + 1 == 1:
move_forward()
elif 'b'.find(bluetoothData) + 1 == 1:
move_backward()
elif 's'.find(bluetoothData) + 1 == 1:
alphabot.stop()
elif 'M'.find(bluetoothData) + 1 == 1:
music_play()
elif 'q'.find(bluetoothData) + 1 == 1:
break
else:
pass
# ----------------------------
# INIT
# ----------------------------
# init Alphabot
try:
alphabot = AlphaBot_v2()
except Exception as e:
print('alphabot exception occurred: {}'.format(e))
alphabot = None
try:
if alphabot is not None:
oled = SSD1306_I2C(128, 64, alphabot.i2c)
except Exception as e:
print('OLED exception occurred: {}'.format(e))
oled = None
try:
if alphabot is not None:
vl53l0x = VL53L0X(i2c=alphabot.i2c)
except Exception as e:
print('vl53l0x exception occurred: {}'.format(e))
vl53l0x = None
try:
classes = (NEC_8, NEC_16)
if alphabot is not None:
ir_remote = classes[0](alphabot.pin_IR, remoteNEC_callback)
else:
ir_remote = None
except Exception as e:
print('ir_remote exception occurred: {}'.format(e))
ir_remote = None
neopixel_leds = FoursNeoPixel(alphabot.pin_RGB)
#ble = bluetooth.BLE()
#uart = BLEUART(ble)
### Print system
print()
print(f"Platform: {sys.platform}")
print(f"MicroPython ver: {os.uname().release} ({os.uname().version})")
print(f"Machine ID: {os.uname().machine}")
print(f"CPU Frequency: {machine.freq()} Hz")
print()
oled.text("Martian", 4*8, 0)
oled.show()
print("Ready to drive on Mars")
neo_french_flag(neopixel_leds)
print("We drive on Mars")
while True:
# IR
# enter_save aka + : robot stop
# up aka >> : robot forward
# down aka 100+ : robot backward
# left aka - : robot go to left
# right aka EQ : robot go to right
# play_pause aka CH : follow line
# setup aka << : bluetooth uart
# 9 aka 9 : play music
utime.sleep_ms(20)
gc.collect()
if ir_current_remote_code == "enter_save":
alphabot.stop()
elif ir_current_remote_code == "up":
move_forward()
elif ir_current_remote_code == "down":
move_backward()
elif ir_current_remote_code == "left":
move_left()
elif ir_current_remote_code == "right":
move_right()
elif ir_current_remote_code == "play_pause":
line_follower_simple()
elif ir_current_remote_code == "setup":
bluetooth_serial_processing()
elif ir_current_remote_code == "9":
music_play()
else:
line_follower_simple()

45
robot/neopixel.py Normal file
View File

@ -0,0 +1,45 @@
# NeoPixel driver for MicroPython
# MIT license; Copyright (c) 2016 Damien P. George, 2021 Jim Mussared
from machine import bitstream
class NeoPixel:
# G R B W
ORDER = (1, 0, 2, 3)
def __init__(self, pin, n, bpp=3, timing=1):
self.pin = pin
self.n = n
self.bpp = bpp
self.buf = bytearray(n * bpp)
self.pin.init(pin.OUT)
# Timing arg can either be 1 for 800kHz or 0 for 400kHz,
# or a user-specified timing ns tuple (high_0, low_0, high_1, low_1).
self.timing = (
((400, 850, 800, 450) if timing else (800, 1700, 1600, 900))
if isinstance(timing, int)
else timing
)
def __len__(self):
return self.n
def __setitem__(self, i, v):
offset = i * self.bpp
for i in range(self.bpp):
self.buf[offset + self.ORDER[i]] = v[i]
def __getitem__(self, i):
offset = i * self.bpp
return tuple(self.buf[offset + self.ORDER[i]] for i in range(self.bpp))
def fill(self, v):
b = self.buf
for i in range(self.bpp):
c = v[i]
for j in range(self.ORDER[i], len(self.buf), self.bpp):
b[j] = c
def write(self):
# BITSTREAM_TYPE_HIGH_LOW = 0
bitstream(self.pin, 0, self.timing, self.buf)

199
robot/stm32_TRsensors.py Normal file
View File

@ -0,0 +1,199 @@
"""
QTRSensors.h - Originally Arduino Library for using Pololu QTR reflectance sensors and reflectance sensor arrays
MIT Licence
Copyright (c) 2008-2012 waveshare Corporation. For more information, see
https://www.waveshare.com/wiki/AlphaBot2-Ar
Copyright (c) 2021 leomlr (Léo Meillier). For more information, see
https://github.com/vittascience/stm32-libraries
https://vittascience.com/stm32/
You may freely modify and share this code, as long as you keep this notice intact.
Disclaimer: To the extent permitted by law, waveshare provides this work
without any warranty. It might be defective, in which case you agree
to be responsible for all resulting costs and damages.
Author: Léo Meillier (leomlr)
Date: 07/2021
Note: library adapted in micropython for using 5 QTR sensors on Alphabot v2 robot controlled by STM32 board.
"""
import pyb
from micropython import const
import utime
PIN_CS = 'D10'
PIN_DOUT = 'D11'
PIN_ADDR = 'D12'
PIN_CLK = 'D13'
NUMSENSORS = const(5)
QTR_EMITTERS_OFF = const(0x00)
QTR_EMITTERS_ON = const(0x01)
QTR_EMITTERS_ON_AND_OFF = const(0x02)
QTR_NO_EMITTER_PIN = const(0xff)
QTR_MAX_SENSORS = const(16)
class TRSensors(object):
""" Base class data member initialization (called by derived class init()). """
def __init__(self, cs=PIN_CS, dout=PIN_DOUT, addr=PIN_ADDR, clk=PIN_CLK):
self._cs = pyb.Pin(cs, pyb.Pin.OUT)
self._dout = pyb.Pin(dout, pyb.Pin.IN)
self._addr = pyb.Pin(addr, pyb.Pin.OUT)
self._clk = pyb.Pin(clk, pyb.Pin.OUT)
self._numSensors = NUMSENSORS
self.calibratedMin = [0] * self._numSensors
self.calibratedMax = [1023] * self._numSensors
self.last_value = 0
""" Reads the sensor values using TLC1543 ADC chip into an array.
The values returned are a measure of the reflectance in abstract units,
with higher values corresponding to lower reflectance (e.g. a black
surface or a void). """
def analogRead(self):
value = [0]* (self._numSensors+1)
#Read Channel0~channel4 AD value
for j in range(0, self._numSensors+1):
self._cs.off()
for i in range(0,4):
#sent 4-bit Address
if (j >> (3 - i)) & 0x01:
self._addr.on()
else:
self._addr.off()
#read MSB 4-bit data
value[j] <<= 1
if self._dout.value():
value[j] |= 0x01
self._clk.on()
self._clk.off()
for i in range(0, self._numSensors+1):
#read LSB 8-bit data
value[j] <<= 1
if self._dout.value():
value[j] |= 0x01
self._clk.on()
self._clk.off()
#no mean ,just delay
#for i in range(0,6):
# self._clk.on()
# self._clk.off()
utime.sleep_us(100)
self._cs.on()
return value[1:]
""" Reads the sensors 10 times and uses the results for
calibration. The sensor values are not returned instead, the
maximum and minimum values found over time are stored internally
and used for the readCalibrated() method. """
def calibrate(self):
sensor_values = []
max_sensor_values = [0]*self._numSensors
min_sensor_values = [0]*self._numSensors
for j in range(0, 10):
sensor_values = self.analogRead()
for i in range(0, self._numSensors):
# set the max we found THIS time
if j == 0 or max_sensor_values[i] < sensor_values[i]:
max_sensor_values[i] = sensor_values[i]
# set the min we found THIS time
if j == 0 or min_sensor_values[i] > sensor_values[i]:
min_sensor_values[i] = sensor_values[i]
# record the min and max calibration values
for i in range(0, self._numSensors):
if min_sensor_values[i] > self.calibratedMax[i]:
self.calibratedMax[i] = min_sensor_values[i]
if max_sensor_values[i] < self.calibratedMin[i]:
self.calibratedMin[i] = max_sensor_values[i]
""" Returns values calibrated to a value between 0 and 1000, where
0 corresponds to the minimum value read by calibrate() and 1000
corresponds to the maximum value. Calibration values are
stored separately for each sensor, so that differences in the
sensors are accounted for automatically. """
def readCalibrated(self):
# read the needed values
sensor_values = self.analogRead()
for i in range(self._numSensors):
denominator = self.calibratedMax[i] - self.calibratedMin[i]
value = 0
if denominator is not 0:
value = (sensor_values[i] - self.calibratedMin[i]) * 1000 / denominator
if value < 0:
value = 0
elif value > 1000:
value = 1000
sensor_values[i] = value
return sensor_values
""" Operates the same as read calibrated, but also returns an
estimated position of the robot with respect to a line. The
estimate is made using a weighted average of the sensor indices
multiplied by 1000, so that a return value of 0 indicates that
the line is directly below sensor 0, a return value of 1000
indicates that the line is directly below sensor 1, 2000
indicates that it's below sensor 2000, etc. Intermediate
values indicate that the line is between two sensors. The
formula is:
0*value0 + 1000*value1 + 2000*value2 + ...
--------------------------------------------
value0 + value1 + value2 + ...
By default, this function assumes a dark line (high values)
surrounded by white (low values). If your line is light on
black, set the optional second argument white_line to true. In
this case, each sensor value will be replaced by (1000-value)
before the averaging. """
def readLine(self, white_line = 0):
sensor_values = self.readCalibrated()
avg = 0
sum = 0
on_line = 0
for i in range(0, self._numSensors):
value = sensor_values[i]
if white_line:
value = 1000-value
# keep track of whether we see the line at all
if value > 200:
on_line = 1
# only average in values that are above a noise threshold
if value > 50:
avg += value * (i * 1000) # this is for the weighted total,
sum += value # this is for the denominator
if on_line != 1:
# If it last read to the left of center, return 0.
if self.last_value < (self._numSensors - 1)*1000/2:
#print("left")
self.last_value = 0
# If it last read to the right of center, return the max.
else:
#print("right")
self.last_value = (self._numSensors - 1)*1000
else:
self.last_value = avg/sum
return self.last_value,sensor_values;

265
robot/stm32_alphabot_v2.py Normal file
View File

@ -0,0 +1,265 @@
"""
MicroPython for AlphaBot2-Ar from Waveshare.
https://github.com/vittascience/stm32-libraries
https://www.waveshare.com/wiki/AlphaBot2-Ar
MIT License
Copyright (c) 2021 leomlr (Léo Meillier)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
__version__ = "0.0.0-auto.0"
__repo__ = "show"
from stm32_TRsensors import TRSensors
from stm32_pcf8574 import PCF8574
import machine
import pyb
import utime
ALPHABOT_V2_PIN_AIN2 = 'A0'
ALPHABOT_V2_PIN_AIN1 = 'A1'
ALPHABOT_V2_PIN_BIN1 = 'A2'
ALPHABOT_V2_PIN_BIN2 = 'A3'
ALPHABOT_V2_PIN_ECHO = 'D2'
ALPHABOT_V2_PIN_TRIG = 'D3'
ALPHABOT_V2_PIN_IR = 'D4'
ALPHABOT_V2_PIN_PWMB = 'D5'
ALPHABOT_V2_PIN_PWMA = 'D6'
ALPHABOT_V2_PIN_RGB = 'D7'
ALPHABOT_V2_PIN_OLED_D_C = 'D8'
ALPHABOT_V2_PIN_OLED_RESET = 'D9'
ALPHABOT_V2_PIN_TRS_CS = 'D10'
ALPHABOT_V2_PIN_TRS_DOUT = 'D11'
ALPHABOT_V2_PIN_TRS_ADDR = 'D12'
ALPHABOT_V2_PIN_TRS_CLK = 'D13'
ALPHABOT_V2_PCF8574_I2C_ADDR = 0x20
ALPHABOT_V2_OLED_I2C_ADDR_DC_OFF = 0x3c
ALPHABOT_V2_OLED_I2C_ADDR_DC_ON = 0x3d
class AlphaBot_v2(object):
def __init__(self):
self.ain1 = pyb.Pin(ALPHABOT_V2_PIN_AIN1, pyb.Pin.OUT)
self.ain2 = pyb.Pin(ALPHABOT_V2_PIN_AIN2, pyb.Pin.OUT)
self.bin1 = pyb.Pin(ALPHABOT_V2_PIN_BIN1, pyb.Pin.OUT)
self.bin2 = pyb.Pin(ALPHABOT_V2_PIN_BIN2, pyb.Pin.OUT)
self.pin_PWMA = pyb.Pin(ALPHABOT_V2_PIN_PWMA, pyb.Pin.OUT_PP)
tim_A = pyb.Timer(1, freq=500)
self.PWMA = tim_A.channel(1, pyb.Timer.PWM, pin=self.pin_PWMA)
self.pin_PWMB = pyb.Pin(ALPHABOT_V2_PIN_PWMB, pyb.Pin.OUT_PP)
tim_B = pyb.Timer(2, freq=500)
self.PWMB = tim_B.channel(1, pyb.Timer.PWM, pin=self.pin_PWMB)
self.stop()
print('[Alpha_INFO]: Motors initialised')
self.trig = pyb.Pin(ALPHABOT_V2_PIN_TRIG, pyb.Pin.OUT)
self.echo = pyb.Pin(ALPHABOT_V2_PIN_ECHO, pyb.Pin.IN)
self.pin_RGB = pyb.Pin(ALPHABOT_V2_PIN_RGB, pyb.Pin.OUT)
self.tr_sensors = TRSensors(
cs = ALPHABOT_V2_PIN_TRS_CS,
dout = ALPHABOT_V2_PIN_TRS_DOUT,
addr = ALPHABOT_V2_PIN_TRS_ADDR,
clk = ALPHABOT_V2_PIN_TRS_CLK
)
print('[Alpha_INFO]: TR sensors initialised')
self.i2c = machine.I2C(1)
self.LEFT_OBSTACLE = 'L'
self.RIGHT_OBSTACLE = 'R'
self.BOTH_OBSTACLE = 'B'
self.NO_OBSTACLE = 'N'
self.JOYSTICK_UP = 'up'
self.JOYSTICK_RIGHT = 'right'
self.JOYSTICK_LEFT = 'left'
self.JOYSTICK_DOWN = 'down'
self.JOYSTICK_CENTER = 'center'
print('[Alpha_INFO]: IR detectors initialised (for obstacles)')
self.pin_IR = pyb.Pin(ALPHABOT_V2_PIN_IR, pyb.Pin.IN)
print('[Alpha_INFO]: IR receiver initialised (for remotes)')
self.pin_oled_reset = pyb.Pin(ALPHABOT_V2_PIN_OLED_RESET, pyb.Pin.OUT)
self.pin_oled_reset.off()
utime.sleep_ms(10)
self.pin_oled_reset.on()
self.pin_DC = pyb.Pin(ALPHABOT_V2_PIN_OLED_D_C, pyb.Pin.OUT)
print('[Alpha_INFO]: OLED screen initialised')
self._pcf8574 = PCF8574(self.i2c, addr=ALPHABOT_V2_PCF8574_I2C_ADDR)
def setPWMA(self, value):
self.PWMA.pulse_width_percent(value)
def setPWMB(self, value):
self.PWMB.pulse_width_percent(value)
def setMotors(self, left=None, right=None):
if left is not None:
if left >= 0 and left <= 100:
self.ain1.off()
self.ain2.on()
self.setPWMA(left)
elif left >= -100 and left < 0:
self.ain1.on()
self.ain2.off()
self.setPWMA(-left)
if right is not None:
if right >= 0 and right <= 100:
self.bin1.off()
self.bin2.on()
self.setPWMB(right)
elif right >= -100 and right < 0:
self.bin1.on()
self.bin2.off()
self.setPWMB(-right)
def stop(self):
self.setMotors(left=0, right=0)
def moveForward(self, speed, duration_ms=0):
self.setMotors(left=speed, right=speed)
if duration_ms:
utime.sleep_ms(duration_ms)
self.stop()
def moveBackward(self, speed, duration_ms=0):
self.setMotors(left=-speed, right=-speed)
if duration_ms:
utime.sleep_ms(duration_ms)
self.stop()
def turnLeft(self, speed, duration_ms=0):
if speed < 20:
self.setMotors(left=speed, right=50-speed)
else:
self.setMotors(left=30-speed, right=speed)
if duration_ms:
utime.sleep_ms(duration_ms)
self.stop()
def turnRight(self, speed, duration_ms=0):
if speed < 20:
self.setMotors(left=50-speed, right=speed)
else:
self.setMotors(left=speed, right=30-speed)
if duration_ms:
utime.sleep_ms(duration_ms)
self.stop()
def calibrateLineFinder(self):
print("[Alpha_INFO]: TR sensors calibration ...\\n")
for i in range(0, 100):
if i<25 or i>= 75:
self.turnRight(15)
else:
self.turnLeft(15)
self.TRSensors_calibrate()
self.stop()
print("Calibration done.\\n")
print(str(self.tr_sensors.calibratedMin) + '\\n')
print(str(self.tr_sensors.calibratedMax) + '\\n')
utime.sleep_ms(500)
def TRSensors_calibrate(self):
self.tr_sensors.calibrate()
def TRSensors_read(self, sensor = 0):
return self.tr_sensors.analogRead()
def TRSensors_readLine(self, sensor = 0):
position, sensor_values = self.tr_sensors.readLine()
if sensor is 0:
return sensor_values
else:
return sensor_values[sensor-1]
def TRSensors_position_readLine(self, sensor = 0):
return self.tr_sensors.readLine()
def readUltrasonicDistance(self, length=15, timeout_us = 30000):
measurements = 0
for i in range(length):
self.trig.off()
utime.sleep_us(2)
self.trig.on()
utime.sleep_us(10)
self.trig.off()
self.echo.value()
measurements += machine.time_pulse_us(self.echo, 1, timeout_us)/1e6 # t_echo in seconds
duration = measurements/length
return 343 * duration/2 * 100
def getOLEDaddr(self):
if self.pin_DC.value():
return ALPHABOT_V2_OLED_I2C_ADDR_DC_ON
else:
return ALPHABOT_V2_OLED_I2C_ADDR_DC_OFF
# Drivers for PCF8574T
def controlBuzzer(self, state):
self._pcf8574.pin(5, state)
def getJoystickValue(self):
i = 0
for i in range(5):
if not self._pcf8574.pin(i): break
elif i == 4: i = None
if i == 0:
return self.JOYSTICK_UP
elif i == 1:
return self.JOYSTICK_RIGHT
elif i == 2:
return self.JOYSTICK_LEFT
elif i == 3:
return self.JOYSTICK_DOWN
elif i == 4:
return self.JOYSTICK_CENTER
else:
return None
def readInfrared(self):
left = not self._pcf8574.pin(7)
right = not self._pcf8574.pin(6)
if left and not right:
return self.LEFT_OBSTACLE
elif not left and right:
return self.RIGHT_OBSTACLE
elif left and right:
return self.BOTH_OBSTACLE
else:
return self.NO_OBSTACLE

View File

@ -0,0 +1,91 @@
# Exemple pour générer des trames d'advertising pour le BLE
from micropython import const
import struct
import bluetooth
# Les trames d'advertising sont sont des paquets répétés ayant la structure suivante :
# 1 octet indiquant la taille des données (N + 1)
# 1 octet indiquant le type de données (voir les constantes ci-dessous)
# N octets de données du type indiqué
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
_ADV_TYPE_MANUFACTURER = const(0xFF)
# Génère une trame qui sera passée à la méthode gap_advertise(adv_data=...).
def adv_payload(
limited_disc=False,
br_edr=False,
name=None,
services=None,
appearance=0,
manufacturer=0,
):
payload = bytearray()
def _append(adv_type, value):
nonlocal payload
payload += struct.pack("BB", len(value) + 1, adv_type) + value
_append(
_ADV_TYPE_FLAGS,
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)),
)
if name:
_append(_ADV_TYPE_NAME, name)
if services:
for uuid in services:
b = bytes(uuid)
if len(b) == 2:
_append(_ADV_TYPE_UUID16_COMPLETE, b)
elif len(b) == 4:
_append(_ADV_TYPE_UUID32_COMPLETE, b)
elif len(b) == 16:
_append(_ADV_TYPE_UUID128_COMPLETE, b)
if appearance:
# Voir org.bluetooth.characteristic.gap.appearance.xml
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))
if manufacturer:
_append(_ADV_TYPE_MANUFACTURER, manufacturer)
return payload
def decode_field(payload, adv_type):
i = 0
result = []
while i + 1 < len(payload):
if payload[i + 1] == adv_type:
result.append(payload[i + 2 : i + payload[i] + 1])
i += 1 + payload[i]
return result
def decode_name(payload):
n = decode_field(payload, _ADV_TYPE_NAME)
return str(n[0], "utf-8") if n else ""
def decode_services(payload):
services = []
for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<h", u)[0]))
for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<d", u)[0]))
for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
services.append(bluetooth.UUID(u))
return services

118
robot/stm32_ble_uart.py Normal file
View File

@ -0,0 +1,118 @@
# Objet du script : mise en oeuvre du service UART BLE de Nordic Semiconductors (NUS pour
# "Nordic UART Service").
# Sources :
# https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_uart_peripheral.py
# Attente active, envoi de l'adresse MAC et réception continue de chaines de caractères
import bluetooth # Classes "primitives du BLE"
from stm32_bleAdvertising import adv_payload # Pour construire la trame d'advertising
from binascii import hexlify # Convertit une donnée binaire en sa représentation hexadécimale
# Constantes requises pour construire le service BLE UART
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
# Définition du service UART avec ses deux caractéristiques RX et TX
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
_FLAG_NOTIFY, # Cette caractéristique notifiera le central des modifications que lui apportera le périphérique
)
_UART_RX = (
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
_FLAG_WRITE, # Le central pourra écrire dans cette caractéristique
)
_UART_SERVICE = (
_UART_UUID,
(_UART_TX, _UART_RX),
)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)
# Nombre maximum d'octets qui peuvent être échangés par la caractéristique RX
_MAX_NB_BYTES = const(100)
ascii_mac = None
class BLEUART:
# Initialisations
def __init__(self, ble, name="WB55-UART", rxbuf=_MAX_NB_BYTES):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
# Enregistrement du service
((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services((_UART_SERVICE,))
# Augmente la taille du tampon rx et active le mode "append"
self._ble.gatts_set_buffer(self._rx_handle, rxbuf, True)
self._connections = set()
self._rx_buffer = bytearray()
self._handler = None
# Advertising du service :
# On peut ajouter en option services=[_UART_UUID], mais cela risque de rendre la payload de la caractéristique trop longue
self._payload = adv_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER)
self._advertise()
# Affiche l'adresse MAC de l'objet
dummy, byte_mac = self._ble.config('mac')
hex_mac = hexlify(byte_mac)
global ascii_mac
ascii_mac = hex_mac.decode("ascii")
print("Adresse MAC : %s" %ascii_mac)
# Interruption pour gérer les réceptions
def irq(self, handler):
self._handler = handler
# Surveille les connexions afin d'envoyer des notifications
def _irq(self, event, data):
# Si un central se connecte
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
self._connections.add(conn_handle)
# Si un central se déconnecte
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
if conn_handle in self._connections:
self._connections.remove(conn_handle)
# Redémarre l'advertising pour permettre de nouvelles connexions
self._advertise()
# Lorsqu'un client écrit dans une caractéristique exposée par le serveur
# (gestion des évènements de recéption depuis le central)
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle = data
if conn_handle in self._connections and value_handle == self._rx_handle:
self._rx_buffer += self._ble.gatts_read(self._rx_handle)
if self._handler:
self._handler()
# Appelée pour vérifier s'il y a des messages en attente de lecture dans RX
def any(self):
return len(self._rx_buffer)
# Retourne les catactères reçus dans RX
def read(self, sz=None):
if not sz:
sz = len(self._rx_buffer)
result = self._rx_buffer[0:sz]
self._rx_buffer = self._rx_buffer[sz:]
return result
# Ecrit dans TX un message à l'attention du central
def write(self, data):
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle, self._tx_handle, data)
# Mets fin à la connexion au port série simulé
def close(self):
for conn_handle in self._connections:
self._ble.gap_disconnect(conn_handle)
self._connections.clear()
# Pour démarrer l'advertising, précise qu'un central pourra se connecter au périphérique
def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us, adv_data=self._payload, connectable = True)

View File

@ -0,0 +1,69 @@
# ir_rx __init__.py Decoder for IR remote control using synchronous code
# IR_RX abstract base class for IR receivers.
# Author: Peter Hinch
# Copyright Peter Hinch 2020-2021 Released under the MIT license
from machine import Timer, Pin
from array import array
from utime import ticks_us
# Save RAM
# from micropython import alloc_emergency_exception_buf
# alloc_emergency_exception_buf(100)
# On 1st edge start a block timer. While the timer is running, record the time
# of each edge. When the timer times out decode the data. Duration must exceed
# the worst case block transmission time, but be less than the interval between
# a block start and a repeat code start (~108ms depending on protocol)
class IR_RX():
# Result/error codes
# Repeat button code
REPEAT = -1
# Error codes
BADSTART = -2
BADBLOCK = -3
BADREP = -4
OVERRUN = -5
BADDATA = -6
BADADDR = -7
def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback
self._pin = pin
self._nedges = nedges
self._tblock = tblock
self.callback = callback
self.args = args
self._errf = lambda _ : None
self.verbose = False
self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun
pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING))
self.edge = 0
self.tim = Timer(-1) # Sofware timer
self.cb = self.decode
# Pin interrupt. Save time of each edge for later decode.
def _cb_pin(self, line):
t = ticks_us()
# On overrun ignore pulses until software timer times out
if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun
if not self.edge: # First edge received
self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb)
self._times[self.edge] = t
self.edge += 1
def do_callback(self, cmd, addr, ext, thresh=0):
self.edge = 0
if cmd >= thresh:
self.callback(cmd, addr, ext, *self.args)
else:
self._errf(cmd)
def error_function(self, func):
self._errf = func
def close(self):
self._pin.irq(handler = None)
self.tim.deinit()

62
robot/stm32_nec.py Normal file
View File

@ -0,0 +1,62 @@
# nec.py Decoder for IR remote control using synchronous code
# Supports NEC protocol.
# For a remote using NEC see https://www.adafruit.com/products/389
# Author: Peter Hinch
# Copyright Peter Hinch 2020 Released under the MIT license
from utime import ticks_us, ticks_diff
from stm32_ir_receiver import IR_RX
class NEC_ABC(IR_RX):
def __init__(self, pin, extended, callback, *args):
# Block lasts <= 80ms (extended mode) and has 68 edges
super().__init__(pin, 68, 80, callback, *args)
self._extended = extended
self._addr = 0
def decode(self, _):
try:
if self.edge > 68:
raise RuntimeError(self.OVERRUN)
width = ticks_diff(self._times[1], self._times[0])
if width < 4000: # 9ms leading mark for all valid data
raise RuntimeError(self.BADSTART)
width = ticks_diff(self._times[2], self._times[1])
if width > 3000: # 4.5ms space for normal data
if self.edge < 68: # Haven't received the correct number of edges
raise RuntimeError(self.BADBLOCK)
# Time spaces only (marks are always 562.5µs)
# Space is 1.6875ms (1) or 562.5µs (0)
# Skip last bit which is always 1
val = 0
for edge in range(3, 68 - 2, 2):
val >>= 1
if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
val |= 0x80000000
elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP) # Treat REPEAT as error.
else:
raise RuntimeError(self.BADSTART)
addr = val & 0xff # 8 bit addr
cmd = (val >> 16) & 0xff
if cmd != (val >> 24) ^ 0xff:
raise RuntimeError(self.BADDATA)
if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check
if not self._extended:
raise RuntimeError(self.BADADDR)
addr |= val & 0xff00 # pass assumed 16 bit address to callback
self._addr = addr
except RuntimeError as e:
cmd = e.args[0]
addr = self._addr if cmd == self.REPEAT else 0 # REPEAT uses last address
# Set up for new data burst and run user callback
self.do_callback(cmd, addr, 0, self.REPEAT)
class NEC_8(NEC_ABC):
def __init__(self, pin, callback, *args):
super().__init__(pin, False, callback, *args)
class NEC_16(NEC_ABC):
def __init__(self, pin, callback, *args):
super().__init__(pin, True, callback, *args)

49
robot/stm32_pcf8574.py Normal file
View File

@ -0,0 +1,49 @@
class PCF8574:
def __init__(self, i2c, addr=0x20):
self._i2c = i2c
i2cModules = self._i2c.scan()
if addr not in i2cModules:
error = "Unable to find module 'PCF8574' at address " + str(hex(addr)) + ". Please check connections with the board.\n"
error += "[Info] I2C address.es detected: " + str([hex(a) for a in i2cModules])
raise ValueError(error)
self._addr = addr
self._port = bytearray(1)
@property
def port(self):
self._read()
return self._port[0]
@port.setter
def port(self, value):
self._port[0] = value & 0xff
self._write()
def pin(self, pin, value=None):
pin = self.validate_pin(pin)
if value is None:
self._read()
return (self._port[0] >> pin) & 1
else:
if value:
self._port[0] |= (1 << (pin))
else:
self._port[0] &= ~(1 << (pin))
self._write()
def toggle(self, pin):
pin = self.validate_pin(pin)
self._port[0] ^= (1 << (pin))
self._write()
def validate_pin(self, pin):
# pin valid range 0..7
if not 0 <= pin <= 7:
raise ValueError('Invalid pin {}. Use 0-7.'.format(pin))
return pin
def _read(self):
self._i2c.readfrom_into(self._addr, self._port)
def _write(self):
self._i2c.writeto(self._addr, self._port)

131
robot/stm32_ssd1306.py Normal file
View File

@ -0,0 +1,131 @@
# MicroPython SSD1306 OLED I2C driver
from micropython import const
import framebuf
import utime
SSD1306_I2C_ADDR = 0x3C
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP, # display off
# address setting
SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE, # start at line 0
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO,
self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET,
0x00,
SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV,
0x80,
SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
SET_CONTRAST,
0xFF, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01, # display on
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def rotate(self, rotate):
self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
self.write_cmd(SET_SEG_REMAP | (rotate & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=SSD1306_I2C_ADDR, external_vcc=False):
if i2c == None:
raise ValueError("I2C object 'SSD1306' needed as argument!")
self._i2c = i2c
utime.sleep_ms(200)
i2cModules = self._i2c.scan()
if addr not in i2cModules:
error = "Unable to find module 'SSD1306' at address " + str(hex(addr)) + ". Please check connections with the board.\n"
error += "[Info] I2C address.es detected: " + str([hex(a) for a in i2cModules])
raise ValueError(error)
self._addr = addr
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self._i2c.writeto(self._addr, self.temp)
def write_data(self, buf):
self.write_list[1] = buf
self._i2c.writevto(self._addr, self.write_list)

525
robot/stm32_vl53l0x.py Normal file
View File

@ -0,0 +1,525 @@
"""
MicroPython for Grove Time Of Flight VL53L0X sensor (I2C).
https://github.com/vittascience/stm32-libraries
https://wiki.seeedstudio.com/Grove-Time_of_Flight_Distance_Sensor-VL53L0X/
MIT License
Copyright (c) 2020 leomlr (Léo Meillier)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/Vittascience/stm32-libraries"
from micropython import const
import utime
import math
_VL53L0X_IIC_ADDR = const(0x29)
# Configuration constants:
_SYSRANGE_START = const(0x00)
_SYSTEM_THRESH_HIGH = const(0x0C)
_SYSTEM_THRESH_LOW = const(0x0E)
_SYSTEM_SEQUENCE_CONFIG = const(0x01)
_SYSTEM_RANGE_CONFIG = const(0x09)
_SYSTEM_INTERMEASUREMENT_PERIOD = const(0x04)
_SYSTEM_INTERRUPT_CONFIG_GPIO = const(0x0A)
_GPIO_HV_MUX_ACTIVE_HIGH = const(0x84)
_SYSTEM_INTERRUPT_CLEAR = const(0x0B)
_RESULT_INTERRUPT_STATUS = const(0x13)
_RESULT_RANGE_STATUS = const(0x14)
_RESULT_CORE_AMBIENT_WINDOW_EVENTS_RTN = const(0xBC)
_RESULT_CORE_RANGING_TOTAL_EVENTS_RTN = const(0xC0)
_RESULT_CORE_AMBIENT_WINDOW_EVENTS_REF = const(0xD0)
_RESULT_CORE_RANGING_TOTAL_EVENTS_REF = const(0xD4)
_RESULT_PEAK_SIGNAL_RATE_REF = const(0xB6)
_ALGO_PART_TO_PART_RANGE_OFFSET_MM = const(0x28)
_I2C_SLAVE_DEVICE_ADDRESS = const(0x8A)
_MSRC_CONFIG_CONTROL = const(0x60)
_PRE_RANGE_CONFIG_MIN_SNR = const(0x27)
_PRE_RANGE_CONFIG_VALID_PHASE_LOW = const(0x56)
_PRE_RANGE_CONFIG_VALID_PHASE_HIGH = const(0x57)
_PRE_RANGE_MIN_COUNT_RATE_RTN_LIMIT = const(0x64)
_FINAL_RANGE_CONFIG_MIN_SNR = const(0x67)
_FINAL_RANGE_CONFIG_VALID_PHASE_LOW = const(0x47)
_FINAL_RANGE_CONFIG_VALID_PHASE_HIGH = const(0x48)
_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT = const(0x44)
_PRE_RANGE_CONFIG_SIGMA_THRESH_HI = const(0x61)
_PRE_RANGE_CONFIG_SIGMA_THRESH_LO = const(0x62)
_PRE_RANGE_CONFIG_VCSEL_PERIOD = const(0x50)
_PRE_RANGE_CONFIG_TIMEOUT_MACROP_HI = const(0x51)
_PRE_RANGE_CONFIG_TIMEOUT_MACROP_LO = const(0x52)
_SYSTEM_HISTOGRAM_BIN = const(0x81)
_HISTOGRAM_CONFIG_INITIAL_PHASE_SELECT = const(0x33)
_HISTOGRAM_CONFIG_READOUT_CTRL = const(0x55)
_FINAL_RANGE_CONFIG_VCSEL_PERIOD = const(0x70)
_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI = const(0x71)
_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_LO = const(0x72)
_CROSSTALK_COMPENSATION_PEAK_RATE_MCPS = const(0x20)
_MSRC_CONFIG_TIMEOUT_MACROP = const(0x46)
_SOFT_RESET_GO2_SOFT_RESET_N = const(0xBF)
_IDENTIFICATION_MODEL_ID = const(0xC0)
_IDENTIFICATION_REVISION_ID = const(0xC2)
_OSC_CALIBRATE_VAL = const(0xF8)
_GLOBAL_CONFIG_VCSEL_WIDTH = const(0x32)
_GLOBAL_CONFIG_SPAD_ENABLES_REF_0 = const(0xB0)
_GLOBAL_CONFIG_SPAD_ENABLES_REF_1 = const(0xB1)
_GLOBAL_CONFIG_SPAD_ENABLES_REF_2 = const(0xB2)
_GLOBAL_CONFIG_SPAD_ENABLES_REF_3 = const(0xB3)
_GLOBAL_CONFIG_SPAD_ENABLES_REF_4 = const(0xB4)
_GLOBAL_CONFIG_SPAD_ENABLES_REF_5 = const(0xB5)
_GLOBAL_CONFIG_REF_EN_START_SELECT = const(0xB6)
_DYNAMIC_SPAD_NUM_REQUESTED_REF_SPAD = const(0x4E)
_DYNAMIC_SPAD_REF_EN_START_OFFSET = const(0x4F)
_POWER_MANAGEMENT_GO1_POWER_FORCE = const(0x80)
_VHV_CONFIG_PAD_SCL_SDA__EXTSUP_HV = const(0x89)
_ALGO_PHASECAL_LIM = const(0x30)
_ALGO_PHASECAL_CONFIG_TIMEOUT = const(0x30)
_VCSEL_PERIOD_PRE_RANGE = const(0)
_VCSEL_PERIOD_FINAL_RANGE = const(1)
def _decode_timeout(val):
# format: "(LSByte * 2^MSByte) + 1"
return float(val & 0xFF) * math.pow(2.0, ((val & 0xFF00) >> 8)) + 1
def _encode_timeout(timeout_mclks):
# format: "(LSByte * 2^MSByte) + 1"
timeout_mclks = int(timeout_mclks) & 0xFFFF
ls_byte = 0
ms_byte = 0
if timeout_mclks > 0:
ls_byte = timeout_mclks - 1
while ls_byte > 255:
ls_byte >>= 1
ms_byte += 1
return ((ms_byte << 8) | (ls_byte & 0xFF)) & 0xFFFF
return 0
def _timeout_mclks_to_microseconds(timeout_period_mclks, vcsel_period_pclks):
macro_period_ns = ((2304 * (vcsel_period_pclks) * 1655) + 500) // 1000
return ((timeout_period_mclks * macro_period_ns) + (macro_period_ns // 2)) // 1000
def _timeout_microseconds_to_mclks(timeout_period_us, vcsel_period_pclks):
macro_period_ns = ((2304 * (vcsel_period_pclks) * 1655) + 500) // 1000
return ((timeout_period_us * 1000) + (macro_period_ns // 2)) // macro_period_ns
class VL53L0X:
"""Driver for the VL53L0X distance sensor."""
def __init__(self, i2c, address=_VL53L0X_IIC_ADDR, io_timeout_s=0):
# pylint: disable=too-many-statements
self._i2c = i2c
self._addr = address
self.io_timeout_s = io_timeout_s
# Check identification registers for expected values.
# From section 3.2 of the datasheet.
if (
self._read_u8(0xC0) is not 0xEE
or self._read_u8(0xC1) is not 0xAA
or self._read_u8(0xC2) is not 0x10
):
raise RuntimeError("Failed to find expected ID register values. Check wiring!")
# Initialize access to the sensor. This is based on the logic from:
# https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp
# Set I2C standard mode.
for pair in ((0x88, 0x00), (0x80, 0x01), (0xFF, 0x01), (0x00, 0x00)):
self._write_u8(pair[0], pair[1])
self._stop_variable = self._read_u8(0x91)
for pair in ((0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)):
self._write_u8(pair[0], pair[1])
# disable SIGNAL_RATE_MSRC (bit 1) and SIGNAL_RATE_PRE_RANGE (bit 4)
# limit checks
config_control = self._read_u8(_MSRC_CONFIG_CONTROL) | 0x12
self._write_u8(_MSRC_CONFIG_CONTROL, config_control)
# set final range signal rate limit to 0.25 MCPS (million counts per
# second)
self.signal_rate_limit = 0.25
self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xFF)
spad_count, spad_is_aperture = self._get_spad_info()
# The SPAD map (RefGoodSpadMap) is read by
# VL53L0X_get_info_from_device() in the API, but the same data seems to
# be more easily readable from GLOBAL_CONFIG_SPAD_ENABLES_REF_0 through
# _6, so read it from there.
ref_spad_map = bytearray(1)
ref_spad_map[0] = _GLOBAL_CONFIG_SPAD_ENABLES_REF_0
self._i2c.writeto(self._addr, ref_spad_map)
buf = bytearray(6)
self._i2c.readfrom_mem_into(self._addr, ref_spad_map[0], buf)
ref_spad_map.extend(buf)
for pair in (
(0xFF, 0x01),
(_DYNAMIC_SPAD_REF_EN_START_OFFSET, 0x00),
(_DYNAMIC_SPAD_NUM_REQUESTED_REF_SPAD, 0x2C),
(0xFF, 0x00),
(_GLOBAL_CONFIG_REF_EN_START_SELECT, 0xB4),
):
self._write_u8(pair[0], pair[1])
first_spad_to_enable = 12 if spad_is_aperture else 0
spads_enabled = 0
for i in range(48):
if i < first_spad_to_enable or spads_enabled == spad_count:
# This bit is lower than the first one that should be enabled,
# or (reference_spad_count) bits have already been enabled, so
# zero this bit.
ref_spad_map[1 + (i // 8)] &= ~(1 << (i % 8))
elif (ref_spad_map[1 + (i // 8)] >> (i % 8)) & 0x1 > 0:
spads_enabled += 1
self._i2c.writeto(self._addr, ref_spad_map)
for pair in (
(0xFF, 0x01),
(0x00, 0x00),
(0xFF, 0x00),
(0x09, 0x00),
(0x10, 0x00),
(0x11, 0x00),
(0x24, 0x01),
(0x25, 0xFF),
(0x75, 0x00),
(0xFF, 0x01),
(0x4E, 0x2C),
(0x48, 0x00),
(0x30, 0x20),
(0xFF, 0x00),
(0x30, 0x09),
(0x54, 0x00),
(0x31, 0x04),
(0x32, 0x03),
(0x40, 0x83),
(0x46, 0x25),
(0x60, 0x00),
(0x27, 0x00),
(0x50, 0x06),
(0x51, 0x00),
(0x52, 0x96),
(0x56, 0x08),
(0x57, 0x30),
(0x61, 0x00),
(0x62, 0x00),
(0x64, 0x00),
(0x65, 0x00),
(0x66, 0xA0),
(0xFF, 0x01),
(0x22, 0x32),
(0x47, 0x14),
(0x49, 0xFF),
(0x4A, 0x00),
(0xFF, 0x00),
(0x7A, 0x0A),
(0x7B, 0x00),
(0x78, 0x21),
(0xFF, 0x01),
(0x23, 0x34),
(0x42, 0x00),
(0x44, 0xFF),
(0x45, 0x26),
(0x46, 0x05),
(0x40, 0x40),
(0x0E, 0x06),
(0x20, 0x1A),
(0x43, 0x40),
(0xFF, 0x00),
(0x34, 0x03),
(0x35, 0x44),
(0xFF, 0x01),
(0x31, 0x04),
(0x4B, 0x09),
(0x4C, 0x05),
(0x4D, 0x04),
(0xFF, 0x00),
(0x44, 0x00),
(0x45, 0x20),
(0x47, 0x08),
(0x48, 0x28),
(0x67, 0x00),
(0x70, 0x04),
(0x71, 0x01),
(0x72, 0xFE),
(0x76, 0x00),
(0x77, 0x00),
(0xFF, 0x01),
(0x0D, 0x01),
(0xFF, 0x00),
(0x80, 0x01),
(0x01, 0xF8),
(0xFF, 0x01),
(0x8E, 0x01),
(0x00, 0x01),
(0xFF, 0x00),
(0x80, 0x00),
):
self._write_u8(pair[0], pair[1])
self._write_u8(_SYSTEM_INTERRUPT_CONFIG_GPIO, 0x04)
gpio_hv_mux_active_high = self._read_u8(_GPIO_HV_MUX_ACTIVE_HIGH)
self._write_u8(
_GPIO_HV_MUX_ACTIVE_HIGH, gpio_hv_mux_active_high & ~0x10
) # active low
self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01)
self._measurement_timing_budget_us = self.measurement_timing_budget
self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xE8)
self.measurement_timing_budget = self._measurement_timing_budget_us
self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0x01)
self._perform_single_ref_calibration(0x40)
self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0x02)
self._perform_single_ref_calibration(0x00)
# "restore the previous Sequence Config"
self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xE8)
def _read_u8(self, address):
# Read an 8-bit unsigned value from the specified 8-bit address.
buf = self._i2c.readfrom_mem(self._addr, address, 1)
return buf[0]
def _read_u16(self, address):
# Read a 16-bit BE unsigned value from the specified 8-bit address.
buf = self._i2c.readfrom_mem(self._addr, address, 2)
return (buf[0] << 8) | buf[1]
def _write_u8(self, address, val):
# Write an 8-bit unsigned value to the specified 8-bit address.
self._i2c.writeto(self._addr, bytearray([address & 0xFF, val & 0xFF]))
def _write_u16(self, address, val):
# Write a 16-bit BE unsigned value to the specified 8-bit address.
self._i2c.writeto(self._addr, bytearray([address & 0xFF, (val >> 8) & 0xFF, val & 0xFF]))
def _get_spad_info(self):
# Get reference SPAD count and type, returned as a 2-tuple of
# count and boolean is_aperture. Based on code from:
# https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp
for pair in ((0x80, 0x01), (0xFF, 0x01), (0x00, 0x00), (0xFF, 0x06)):
self._write_u8(pair[0], pair[1])
self._write_u8(0x83, self._read_u8(0x83) | 0x04)
for pair in (
(0xFF, 0x07),
(0x81, 0x01),
(0x80, 0x01),
(0x94, 0x6B),
(0x83, 0x00),
):
self._write_u8(pair[0], pair[1])
start = utime.gmtime()
while self._read_u8(0x83) == 0x00:
if (
self.io_timeout_s > 0
and (utime.gmtime() - start) >= self.io_timeout_s
):
raise RuntimeError("Timeout waiting for VL53L0X!")
self._write_u8(0x83, 0x01)
tmp = self._read_u8(0x92)
count = tmp & 0x7F
is_aperture = ((tmp >> 7) & 0x01) == 1
for pair in ((0x81, 0x00), (0xFF, 0x06)):
self._write_u8(pair[0], pair[1])
self._write_u8(0x83, self._read_u8(0x83) & ~0x04)
for pair in ((0xFF, 0x01), (0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)):
self._write_u8(pair[0], pair[1])
return (count, is_aperture)
def _perform_single_ref_calibration(self, vhv_init_byte):
# based on VL53L0X_perform_single_ref_calibration() from ST API.
self._write_u8(_SYSRANGE_START, 0x01 | vhv_init_byte & 0xFF)
start = utime.gmtime()
while (self._read_u8(_RESULT_INTERRUPT_STATUS) & 0x07) == 0:
if (
self.io_timeout_s > 0
and (utime.gmtime() - start) >= self.io_timeout_s
):
raise RuntimeError("Timeout waiting for VL53L0X!")
self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01)
self._write_u8(_SYSRANGE_START, 0x00)
def _get_vcsel_pulse_period(self, vcsel_period_type):
# pylint: disable=no-else-return
# Disable should be removed when refactor can be tested
if vcsel_period_type == _VCSEL_PERIOD_PRE_RANGE:
val = self._read_u8(_PRE_RANGE_CONFIG_VCSEL_PERIOD)
return (((val) + 1) & 0xFF) << 1
elif vcsel_period_type == _VCSEL_PERIOD_FINAL_RANGE:
val = self._read_u8(_FINAL_RANGE_CONFIG_VCSEL_PERIOD)
return (((val) + 1) & 0xFF) << 1
return 255
def _get_sequence_step_enables(self):
# based on VL53L0X_GetSequenceStepEnables() from ST API
sequence_config = self._read_u8(_SYSTEM_SEQUENCE_CONFIG)
tcc = (sequence_config >> 4) & 0x1 > 0
dss = (sequence_config >> 3) & 0x1 > 0
msrc = (sequence_config >> 2) & 0x1 > 0
pre_range = (sequence_config >> 6) & 0x1 > 0
final_range = (sequence_config >> 7) & 0x1 > 0
return (tcc, dss, msrc, pre_range, final_range)
def _get_sequence_step_timeouts(self, pre_range):
# based on get_sequence_step_timeout() from ST API but modified by
# pololu here:
# https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp
pre_range_vcsel_period_pclks = self._get_vcsel_pulse_period(
_VCSEL_PERIOD_PRE_RANGE
)
msrc_dss_tcc_mclks = (self._read_u8(_MSRC_CONFIG_TIMEOUT_MACROP) + 1) & 0xFF
msrc_dss_tcc_us = _timeout_mclks_to_microseconds(
msrc_dss_tcc_mclks, pre_range_vcsel_period_pclks
)
pre_range_mclks = _decode_timeout(
self._read_u16(_PRE_RANGE_CONFIG_TIMEOUT_MACROP_HI)
)
pre_range_us = _timeout_mclks_to_microseconds(
pre_range_mclks, pre_range_vcsel_period_pclks
)
final_range_vcsel_period_pclks = self._get_vcsel_pulse_period(
_VCSEL_PERIOD_FINAL_RANGE
)
final_range_mclks = _decode_timeout(
self._read_u16(_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI)
)
if pre_range:
final_range_mclks -= pre_range_mclks
final_range_us = _timeout_mclks_to_microseconds(
final_range_mclks, final_range_vcsel_period_pclks
)
return (
msrc_dss_tcc_us,
pre_range_us,
final_range_us,
final_range_vcsel_period_pclks,
pre_range_mclks,
)
@property
def signal_rate_limit(self):
"""The signal rate limit in mega counts per second."""
val = self._read_u16(_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT)
# Return value converted from 16-bit 9.7 fixed point to float.
return val / (1 << 7)
@signal_rate_limit.setter
def signal_rate_limit(self, val):
assert 0.0 <= val <= 511.99
# Convert to 16-bit 9.7 fixed point value from a float.
val = int(val * (1 << 7))
self._write_u16(_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT, val)
@property
def measurement_timing_budget(self):
"""The measurement timing budget in microseconds."""
budget_us = 1910 + 960 # Start overhead + end overhead.
tcc, dss, msrc, pre_range, final_range = self._get_sequence_step_enables()
step_timeouts = self._get_sequence_step_timeouts(pre_range)
msrc_dss_tcc_us, pre_range_us, final_range_us, _, _ = step_timeouts
if tcc:
budget_us += msrc_dss_tcc_us + 590
if dss:
budget_us += 2 * (msrc_dss_tcc_us + 690)
elif msrc:
budget_us += msrc_dss_tcc_us + 660
if pre_range:
budget_us += pre_range_us + 660
if final_range:
budget_us += final_range_us + 550
self._measurement_timing_budget_us = budget_us
return budget_us
@measurement_timing_budget.setter
def measurement_timing_budget(self, budget_us):
# pylint: disable=too-many-locals
assert budget_us >= 20000
used_budget_us = 1320 + 960 # Start (diff from get) + end overhead
tcc, dss, msrc, pre_range, final_range = self._get_sequence_step_enables()
step_timeouts = self._get_sequence_step_timeouts(pre_range)
msrc_dss_tcc_us, pre_range_us, _ = step_timeouts[:3]
final_range_vcsel_period_pclks, pre_range_mclks = step_timeouts[3:]
if tcc:
used_budget_us += msrc_dss_tcc_us + 590
if dss:
used_budget_us += 2 * (msrc_dss_tcc_us + 690)
elif msrc:
used_budget_us += msrc_dss_tcc_us + 660
if pre_range:
used_budget_us += pre_range_us + 660
if final_range:
used_budget_us += 550
# "Note that the final range timeout is determined by the timing
# budget and the sum of all other timeouts within the sequence.
# If there is no room for the final range timeout, then an error
# will be set. Otherwise the remaining time will be applied to
# the final range."
if used_budget_us > budget_us:
raise ValueError("Requested timeout too big.")
final_range_timeout_us = budget_us - used_budget_us
final_range_timeout_mclks = _timeout_microseconds_to_mclks(
final_range_timeout_us, final_range_vcsel_period_pclks
)
if pre_range:
final_range_timeout_mclks += pre_range_mclks
self._write_u16(
_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI,
_encode_timeout(final_range_timeout_mclks),
)
self._measurement_timing_budget_us = budget_us
def getRangeMillimeters(self):
"""Perform a single reading of the range for an object in front of
the sensor and return the distance in millimeters.
"""
# Adapted from readRangeSingleMillimeters &
# readRangeContinuousMillimeters in pololu code at:
# https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp
for pair in (
(0x80, 0x01),
(0xFF, 0x01),
(0x00, 0x00),
(0x91, self._stop_variable),
(0x00, 0x01),
(0xFF, 0x00),
(0x80, 0x00),
(_SYSRANGE_START, 0x01),
):
self._write_u8(pair[0], pair[1])
start = utime.gmtime()
while (self._read_u8(_SYSRANGE_START) & 0x01) > 0:
if (self.io_timeout_s > 0 and (utime.gmtime() - start) >= self.io_timeout_s):
raise RuntimeError("Timeout waiting for VL53L0X!")
start = utime.gmtime()
while (self._read_u8(_RESULT_INTERRUPT_STATUS) & 0x07) == 0:
if (self.io_timeout_s > 0 and (utime.gmtime() - start) >= self.io_timeout_s):
raise RuntimeError("Timeout waiting for VL53L0X!")
# assumptions: Linearity Corrective Gain is 1000 (default)
# fractional ranging is not enabled
range_mm = self._read_u16(_RESULT_RANGE_STATUS + 10)
self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01)
return range_mm
def set_address(self, new_address):
"""Set a new I2C address to the instantaited object. This is only called when using
multiple VL53L0X sensors on the same I2C bus (SDA & SCL pins). See also the
`example <examples.html#multiple-vl53l0x-on-same-i2c-bus>`_ for proper usage.
:param int new_address: The 7-bit `int` that is to be assigned to the VL53L0X sensor.
The address that is assigned should NOT be already in use by another device on the
I2C bus.
.. important:: To properly set the address to an individual VL53L0X sensor, you must
first ensure that all other VL53L0X sensors (using the default address of ``0x29``)
on the same I2C bus are in their off state by pulling the "SHDN" pins LOW. When the
"SHDN" pin is pulled HIGH again the default I2C address is ``0x29``.
"""
self._i2c.write(_I2C_SLAVE_DEVICE_ADDRESS, new_address & 0x7F)