diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e549fd1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__* +.metadata +*.asm.bin diff --git a/Epreuve3.py b/Epreuve3.py index 535170f..df6353c 100644 --- a/Epreuve3.py +++ b/Epreuve3.py @@ -227,6 +227,7 @@ class Simulator: print(f"PC={pc_before:02X} {instr:20s} +Cycles={cycles_added:3d} Total={c.cycles}") print(f" {regs_str} LT={c.lt} EQ={c.eq} SP={c.sp}") print("-" * 60) + ram = self.dump_ram() return { "pc": pc_before, "instr": instr, @@ -234,7 +235,8 @@ class Simulator: "regs": c.regs.copy(), "lt": c.lt, "eq": c.eq, - "sp": c.sp + "sp": c.sp, + "ram": ram } @@ -248,6 +250,15 @@ class Simulator: steps += 1 time.sleep(0.05) + def dump_ram(self): + print("\n========= RAM DUMP ========") + for addr in range(0, 256, 8): + chunk = self.ram[addr:addr+8] + hex_values = " ".join(f"{b:02X}" for b in chunk) + print(f"{addr:02X}: {hex_values}") + return self.ram + print("===========================\n") + # --------------------------------------------------------- # LECTURE D'UN FICHIER .bin ET LANCEMENT diff --git a/programme.bin b/programme.bin deleted file mode 100644 index d6049e9..0000000 Binary files a/programme.bin and /dev/null differ diff --git a/programme.py b/programme.py deleted file mode 100644 index e649a52..0000000 --- a/programme.py +++ /dev/null @@ -1,17 +0,0 @@ -program = bytes([ - 0b10100001, # PUSH R1 - 0b11100000, 0b00001111, # MOV R0, 15 - 0b11100001, 0b00000001, # MOV R1, 1 - 0b11010001, # SUB R0, R1 - 0b10010000, 0b00000000, # CMP R0, 0 - 0b00100000, 0b00001110, # JEQ 14 - 0b01000000, 0b00000101, # JMP 5 - 0b00000000, # DB 0 - 0b01000011, # 'C' - 0b01100001, # POP R1 - 0b11100000, 0b01000001, # MOV R0, 'A' - 0b10000000 # RET -]) - -with open("programme.bin", "wb") as f: - f.write(program) diff --git a/pybcdc.inf b/pybcdc.inf deleted file mode 100644 index a131ec7..0000000 --- a/pybcdc.inf +++ /dev/null @@ -1,92 +0,0 @@ -; Windows USB CDC ACM Setup File -; Based on INF files which were: -; Copyright (c) 2000 Microsoft Corporation -; Copyright (C) 2007 Microchip Technology Inc. -; Likely to be covered by the MLPL as found at: -; . - -[Version] -Signature="$Windows NT$" -Class=Ports -ClassGuid={4D36E978-E325-11CE-BFC1-08002BE10318} -Provider=%MFGNAME% -LayoutFile=layout.inf -DriverVer=03/11/2010,5.1.2600.3 - -[Manufacturer] -%MFGNAME%=DeviceList, NTamd64 - -[DestinationDirs] -DefaultDestDir=12 - -;--------------------------------------------------------------------- -; Windows 2000/XP/Server2003/Vista/Server2008/7 - 32bit Sections - -[DriverInstall.nt] -include=mdmcpq.inf -CopyFiles=DriverCopyFiles.nt -AddReg=DriverInstall.nt.AddReg - -[DriverCopyFiles.nt] -usbser.sys,,,0x20 - -[DriverInstall.nt.AddReg] -HKR,,DevLoader,,*ntkern -HKR,,NTMPDriver,,usbser.sys -HKR,,EnumPropPages32,,"MsPorts.dll,SerialPortPropPageProvider" - -[DriverInstall.nt.Services] -AddService=usbser, 0x00000002, DriverService.nt - -[DriverService.nt] -DisplayName=%SERVICE% -ServiceType=1 -StartType=3 -ErrorControl=1 -ServiceBinary=%12%\usbser.sys - -;--------------------------------------------------------------------- -; Windows XP/Server2003/Vista/Server2008/7 - 64bit Sections - -[DriverInstall.NTamd64] -include=mdmcpq.inf -CopyFiles=DriverCopyFiles.NTamd64 -AddReg=DriverInstall.NTamd64.AddReg - -[DriverCopyFiles.NTamd64] -usbser.sys,,,0x20 - -[DriverInstall.NTamd64.AddReg] -HKR,,DevLoader,,*ntkern -HKR,,NTMPDriver,,usbser.sys -HKR,,EnumPropPages32,,"MsPorts.dll,SerialPortPropPageProvider" - -[DriverInstall.NTamd64.Services] -AddService=usbser, 0x00000002, DriverService.NTamd64 - -[DriverService.NTamd64] -DisplayName=%SERVICE% -ServiceType=1 -StartType=3 -ErrorControl=1 -ServiceBinary=%12%\usbser.sys - -;--------------------------------------------------------------------- -; Vendor and Product ID Definitions - -[SourceDisksFiles] -[SourceDisksNames] -[DeviceList] -%DESCRIPTION%=DriverInstall, USB\VID_f055&PID_9800&MI_00, USB\VID_f055&PID_9800&MI_01, USB\VID_f055&PID_9801&MI_00, USB\VID_f055&PID_9801&MI_01, USB\VID_f055&PID_9802 - -[DeviceList.NTamd64] -%DESCRIPTION%=DriverInstall, USB\VID_f055&PID_9800&MI_00, USB\VID_f055&PID_9800&MI_01, USB\VID_f055&PID_9801&MI_00, USB\VID_f055&PID_9801&MI_01, USB\VID_f055&PID_9802 - -;--------------------------------------------------------------------- -; String Definitions - -[Strings] -MFGFILENAME="pybcdc" -MFGNAME="MicroPython" -DESCRIPTION="Pyboard USB Comm Port" -SERVICE="USB Serial Driver" diff --git a/simulateur_front.py b/simulateur_front.py index 0f15558..0208636 100644 --- a/simulateur_front.py +++ b/simulateur_front.py @@ -6,19 +6,23 @@ import subprocess import sys root = tk.Tk() +root.title("Simulateur Epreuve 3 - 24H du Code 2026") +image = tk.PhotoImage(file="SII++.png") +#root.configure(bg="#0059A3") +root.configure(bg="#89B4E1") # Widgets are added here #Frame principale -frame = tk.Frame(root, width=800, height=400) +frame = tk.Frame(root, width=800, height=400, background="#89B4E1") frame.pack(padx=10, pady=10) #Frames Corps -instructions_frame = tk.Frame(frame, width=400, height=400, bg="grey") +instructions_frame = tk.Frame(frame, width=400, height=400, bg="#89B4E1") instructions_frame.pack(padx=5, pady=5, side=tk.LEFT, fill=Y) -infos_frames = tk.Frame(frame, width=400, height=400, bg="green") +infos_frames = tk.Frame(frame, width=400, height=400, bg="#89B4E1") infos_frames.pack(padx=5, pady=5, side=tk.RIGHT) @@ -42,18 +46,32 @@ scrollbar.config(command=mylist.yview) #Frame instruction suivante -single_inst_frame = tk.Frame(instructions_frame, width=400, height=100, bg="grey") -single_inst_frame.pack(padx=5, pady=5, side=tk.BOTTOM) +single_inst_frame = tk.Frame(instructions_frame, width=400, height=100, bg="#89B4E1") +single_inst_frame.pack(padx=5, pady=5, side=tk.TOP) Label(single_inst_frame, text=f"Instruction").pack(pady=5, side=LEFT) single_instr = Label(single_inst_frame, text=f"instruction+cycle", width=30, bg="white") single_instr.pack(pady=5, side=tk.LEFT) +# Boutons +buttons_frame = tk.Frame(instructions_frame, width=400, height=100, bg="#89B4E1") +buttons_frame.pack(padx=5, pady=5, side=tk.TOP) +buttonAll = tk.Button(root, text="All In") +buttonStep = tk.Button(root, text="Step By Step") + +buttonAll.pack(in_=buttons_frame, side=tk.LEFT) +buttonStep.pack(in_=buttons_frame, side=tk.LEFT) + +image_frame = tk.Frame(instructions_frame, width=400, height=100, bg="#89B4E1") +image_frame.pack(padx=5, pady=5, side=tk.TOP) + +display_image = image.subsample(10, 10) +tk.Label(image_frame, image=display_image, bg="#89B4E1").pack(padx=5, pady=5) #Frame registres -registres_frames = tk.Frame(infos_frames, width=400, height=100, bg="blue") +registres_frames = tk.Frame(infos_frames, width=400, height=100, bg="#89B4E1") registres_frames.pack(padx=10, pady=10, side=tk.TOP) @@ -101,7 +119,7 @@ label_registre_4.pack(pady=5) #Frame annexes -annexes_frames = tk.Frame(infos_frames, width=400, height=100, bg="blue") +annexes_frames = tk.Frame(infos_frames, width=400, height=100, bg="#89B4E1") annexes_frames.pack(padx=10, pady=10, side=tk.TOP) @@ -219,6 +237,12 @@ def update_gui(): # Mettre à jour la valeur précédente de SP pour la prochaine itération previous_sp = state['sp'] + ram = state['ram'] + + for addr in range(0, 256, 8): + chunk = ram[addr:addr+8] + hex_values = " ".join(f"{b:02X}" for b in chunk) + myRamList.insert(tk.END, f"{addr:02X}: {hex_values}") # relance automatique root.after(200, update_gui) diff --git a/test.py b/test.py deleted file mode 100644 index 4b49ee5..0000000 --- a/test.py +++ /dev/null @@ -1,11 +0,0 @@ -import sys - - -path = "" -args = sys.argv -if (len(args) > 1): - path = args[1] -else: - print("NEED PATH !!!") - exit(0) -print(path) diff --git a/tmp/Interpreteur.py b/tmp/Interpreteur.py deleted file mode 100644 index 76cf74b..0000000 --- a/tmp/Interpreteur.py +++ /dev/null @@ -1,293 +0,0 @@ -# --------------------------------------------------------- -# Simulateur -# --------------------------------------------------------- -# - Bus données : 8 bits -# - 4 registres R0..R3 (8 bits) -# - Bus adresse : 8 bits -# - RAM : 256 octets -# - Instructions : 1 ou 2 octets -# - Cycles : 1 octet -> 1, 2 octets -> 2, LDR/STR -> 3 -# - PC démarre à 0 -# - Pile descendante, SP=255 -# --------------------------------------------------------- - -import sys, time - - -class CPU: - pc: int = 0 - sp: int = 255 - regs: list = [0, 0, 0, 0] # R0..R3 - lt: int = 0 # flag LT - eq: int = 0 # flag EQ - cycles: int = 0 - running: bool = True - after_ret: bool = False - - def __post_init__(self): - if not self.regs: - self.regs = [0, 0, 0, 0] - - -class Simulator: - def __init__(self, program: bytes): - self.ram = bytearray(256) - for i, b in enumerate(program[:256]): - self.ram[i] = b - self.cpu = CPU() - self.program_size = len(program) - self.motorCallback = None - - # ---- Getter / Setter pour interfacer le robot - - def setMotorCallback(self, callback): - self.motorCallback = callback - - - # ----------------- utilitaires mémoire / pile ----------------- - - def fetch_byte(self) -> int: - b = self.ram[self.cpu.pc] - self.cpu.pc = (self.cpu.pc + 1) & 0xFF - return b - - def push(self, value: int): - if self.cpu.sp < 0: - raise RuntimeError("STACK OVERFLOW") - self.ram[self.cpu.sp] = value & 0xFF - self.cpu.sp -= 1 - - def pop(self) -> int: - if self.cpu.sp >= 255: - return 0 - self.cpu.sp += 1 - return self.ram[self.cpu.sp] - - # ----------------- exécution d'une instruction ----------------- - - def step(self): - c = self.cpu - pc_before = c.pc - b = self.fetch_byte() - - instr = "" - size = 1 # taille en octets (1 ou 2) - extra_cycles = 0 # pour LDR/STR/TIM - - # --- instructions 2 octets à opcode fixe --- - #print(pc_before) - #print(self.program_size) - if c.after_ret: - instr = f"DB 0x{b:02X}" - - elif b == 0x00: # CALL _label - addr = self.fetch_byte() - size = 2 - instr = f"CALL {addr}" - self.push(c.pc) - c.pc = addr - - elif b == 0x40: # JMP _label - addr = self.fetch_byte() - size = 2 - instr = f"JMP {addr}" - c.pc = addr - - elif b == 0xC0: # JLT _label - addr = self.fetch_byte() - size = 2 - instr = f"JLT {addr}" - if c.lt == 1: - c.pc = addr - - elif b == 0x20: # JEQ _label - addr = self.fetch_byte() - size = 2 - instr = f"JEQ {addr}" - if c.eq == 1: - c.pc = addr - - elif b == 0x80: # RET - instr = "RET" - ret = self.pop() - if c.sp >= 255 and ret == 0: - c.after_ret = True - c.running = False - else: - c.pc = ret - - # --- PUSH / POP --- - elif (b & 0b11111100) == 0b10100000: # PUSH Rx - r = b & 0b11 - instr = f"PUSH R{r}" - self.push(c.regs[r]) - - elif (b & 0b11111100) == 0b01100000: # POP Rx - r = b & 0b11 - instr = f"POP R{r}" - c.regs[r] = self.pop() - - # --- MOV Rx valeur / SUB Rx valeur / CMP Rx valeur --- - elif (b & 0b11111100) == 0b11100000: # MOV Rx valeur - r = b & 0b11 - imm = self.fetch_byte() - size = 2 - instr = f"MOV R{r}, {imm}" - c.regs[r] = imm - - elif (b & 0b11111100) == 0b00010000: # SUB Rx valeur - r = b & 0b11 - imm = self.fetch_byte() - size = 2 - instr = f"SUB R{r}, {imm}" - c.regs[r] = (c.regs[r] - imm) & 0xFF - - elif (b & 0b11111100) == 0b10010000: # CMP Rx valeur - r = b & 0b11 - imm = self.fetch_byte() - size = 2 - instr = f"CMP R{r}, {imm}" - v = c.regs[r] - c.lt = 1 if v < imm else 0 - c.eq = 1 if v == imm else 0 - - # --- MOV / SUB / CMP registre-registre --- - elif (b & 0b11110000) == 0b01010000: # MOV Rx Ry - dst = (b >> 2) & 0b11 - src = b & 0b11 - instr = f"MOV R{dst}, R{src}" - c.regs[dst] = c.regs[src] - - elif (b & 0b11110000) == 0b11010000: # SUB Rx Ry - dst = (b >> 2) & 0b11 - src = b & 0b11 - instr = f"SUB R{dst}, R{src}" - c.regs[dst] = (c.regs[dst] - c.regs[src]) & 0xFF - - elif (b & 0b11110000) == 0b00110000: # CMP Rx Ry - dst = (b >> 2) & 0b11 - src = b & 0b11 - instr = f"CMP R{dst}, R{src}" - v1 = c.regs[dst] - v2 = c.regs[src] - c.lt = 1 if v1 < v2 else 0 - c.eq = 1 if v1 == v2 else 0 - - # --- LDR / STR (2 octets, 3 cycles) --- - elif (b & 0b11110000) == 0b10110000: # LDR Rx Ry _label - dst = (b >> 2) & 0b11 - src = b & 0b11 - addr = self.fetch_byte() - size = 2 - instr = f"LDR R{dst}, R{src}, {addr}" - eff = (addr + c.regs[src]) & 0xFF - c.regs[dst] = self.ram[eff] - extra_cycles = 1 # 2 octets -> 2 cycles +1 = 3 - - elif (b & 0b11110000) == 0b01110000: # STR Rx Ry _label - dst = (b >> 2) & 0b11 - src = b & 0b11 - addr = self.fetch_byte() - size = 2 - instr = f"STR R{dst}, R{src}, {addr}" - eff = (addr + c.regs[src]) & 0xFF - self.ram[eff] = c.regs[dst] & 0xFF - extra_cycles = 1 - - # --- OUT Rx --- - elif (b & 0b11111100) == 0b11110000: # OUT Rx - r = b & 0b11 - instr = f"OUT R{r}" - registre = c.regs[r] - print(f"[OUT] R{r} = {registre}") - if (self.motorCallback != None): - motG = (registre >> 4) & 0b1111 - motD = (registre) & 0b1111 - self.motorCallback(motG, motD) - - # --- TIM valeur --- - elif b == 0xF8: # TIM - second = self.fetch_byte() - size = 2 - m = (second >> 7) & 0x1 - v = second & 0x7F - instr = f"TIM m={m}, v={v}" - mult = 1 if m == 0 else 100 - pause_ms = mult * (v + 1) - c.cycles += pause_ms # modélisation de la pause - print(f"Sleep {pause_ms}ms...") - time.sleep(pause_ms/1000) - print("BIPBIP") - - # if pc_before >= self.program_size: - # if 32 <= b <= 126: - # instr = f"DB 0x{b:02X} ('{chr(b)}')" - # else: - # instr = f"DB 0x{b:02X}" - else: - instr = f"UNKNOWN 0x{b:02X}" - c.running = False - - - - # calcul des cycles - if (b & 0b11110000) in (0xB0, 0x70): # LDR / STR - c.cycles += 3 - cycles_added = 3 - else: - c.cycles += size - cycles_added = size - - self.report(pc_before, instr, cycles_added) - - # ----------------- rapport d'exécution ----------------- - - def report(self, pc_before: int, instr: str, cycles_added: int): - c = self.cpu - regs_str = " ".join(f"R{i}={c.regs[i]:02X}" for i in range(4)) - print(f"PC={pc_before:02X} {instr:20s} +Cycles={cycles_added:3d} Total={c.cycles}") - print(f" {regs_str} LT={c.lt} EQ={c.eq} SP={c.sp}") - print("-" * 60) - - # ----------------- boucle principale ----------------- - - def run(self, max_steps: int = 100000): - steps = 0 - while self.cpu.running and steps < max_steps: - self.step() - steps += 1 - - - -def motorCallback(motG, motD): - sMotG = 1 - ((motG >> 2) & 0b10) - sMotD = 1 - ((motD >> 2) & 0b10) - motG = ((motG & 0b0111) * sMotG) * 100 / 7 - motD = ((motD & 0b0111) * sMotD) * 100 / 7 - print("Mot G :", motG) - print("Mot D :", motD) - - -def StartCPU(filename, callback): - with open(filename, "rb") as f: - program = f.read() - sim = Simulator(program) - sim.setMotorCallback(callback) - while sim.cpu.running: - sim.run(max_steps = 1) - time.sleep(0.1) - -import time -# --------------------------------------------------------- -# LECTURE D'UN FICHIER .bin ET LANCEMENT -# --------------------------------------------------------- -if __name__ == "__main__": - # Nom du fichier binaire à exécuter - path ="" - args= sys.argv - if (len(args) > 1): - filename = args[1] - print("filename: " + filename) - StartCPU(filename, motorCallback) - else: - print("Needs *.bin as parameter") diff --git a/tmp/README.txt b/tmp/README.txt deleted file mode 100644 index 4e9dc35..0000000 --- a/tmp/README.txt +++ /dev/null @@ -1,12 +0,0 @@ -This is a MicroPython board - -You can get started right away by writing your Python code in 'main.py'. - -For a serial prompt: - - Windows: you need to go to 'Device manager', right click on the unknown device, - then update the driver software, using the 'pybcdc.inf' file found on this drive. - Then use a terminal program like Hyperterminal or putty. - - Mac OS X: use the command: screen /dev/tty.usbmodem* - - Linux: use the command: screen /dev/ttyACM0 - -For online docs please visit http://docs.micropython.org/ diff --git a/tmp/aioble/__init__.py b/tmp/aioble/__init__.py deleted file mode 100644 index dde89f5..0000000 --- a/tmp/aioble/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -# MicroPython aioble module -# MIT license; Copyright (c) 2021 Jim Mussared - -from micropython import const - -from .device import Device, DeviceDisconnectedError -from .core import log_info, log_warn, log_error, GattError, config, stop - -try: - from .peripheral import advertise -except: - log_info("Peripheral support disabled") - -try: - from .central import scan -except: - log_info("Central support disabled") - -try: - from .server import ( - Service, - Characteristic, - BufferedCharacteristic, - Descriptor, - register_services, - ) -except: - log_info("GATT server support disabled") - - -ADDR_PUBLIC = const(0) -ADDR_RANDOM = const(1) diff --git a/tmp/aioble/central.py b/tmp/aioble/central.py deleted file mode 100644 index adfc972..0000000 --- a/tmp/aioble/central.py +++ /dev/null @@ -1,297 +0,0 @@ -# MicroPython aioble module -# MIT license; Copyright (c) 2021 Jim Mussared - -from micropython import const - -import bluetooth -import struct - -import uasyncio as asyncio - -from .core import ( - ensure_active, - ble, - log_info, - log_error, - log_warn, - register_irq_handler, -) -from .device import Device, DeviceConnection, DeviceTimeout - - -_IRQ_SCAN_RESULT = const(5) -_IRQ_SCAN_DONE = const(6) - -_IRQ_PERIPHERAL_CONNECT = const(7) -_IRQ_PERIPHERAL_DISCONNECT = const(8) - -_ADV_IND = const(0) -_ADV_DIRECT_IND = const(1) -_ADV_SCAN_IND = const(2) -_ADV_NONCONN_IND = const(3) -_SCAN_RSP = const(4) - -_ADV_TYPE_FLAGS = const(0x01) -_ADV_TYPE_NAME = const(0x09) -_ADV_TYPE_SHORT_NAME = const(0x08) -_ADV_TYPE_UUID16_INCOMPLETE = const(0x2) -_ADV_TYPE_UUID16_COMPLETE = const(0x3) -_ADV_TYPE_UUID32_INCOMPLETE = const(0x4) -_ADV_TYPE_UUID32_COMPLETE = const(0x5) -_ADV_TYPE_UUID128_INCOMPLETE = const(0x6) -_ADV_TYPE_UUID128_COMPLETE = const(0x7) -_ADV_TYPE_APPEARANCE = const(0x19) -_ADV_TYPE_MANUFACTURER = const(0xFF) - - -# Keep track of the active scanner so IRQs can be delivered to it. -_active_scanner = None - - -# Set of devices that are waiting for the peripheral connect IRQ. -_connecting = set() - - -def _central_irq(event, data): - # Send results and done events to the active scanner instance. - if event == _IRQ_SCAN_RESULT: - addr_type, addr, adv_type, rssi, adv_data = data - if not _active_scanner: - return - _active_scanner._queue.append((addr_type, bytes(addr), adv_type, rssi, bytes(adv_data))) - _active_scanner._event.set() - elif event == _IRQ_SCAN_DONE: - if not _active_scanner: - return - _active_scanner._done = True - _active_scanner._event.set() - - # Peripheral connect must be in response to a pending connection, so find - # it in the pending connection set. - elif event == _IRQ_PERIPHERAL_CONNECT: - conn_handle, addr_type, addr = data - - for d in _connecting: - if d.addr_type == addr_type and d.addr == addr: - # Allow connect() to complete. - connection = d._connection - connection._conn_handle = conn_handle - connection._event.set() - break - - # Find the active device connection for this connection handle. - elif event == _IRQ_PERIPHERAL_DISCONNECT: - conn_handle, _, _ = data - if connection := DeviceConnection._connected.get(conn_handle, None): - # Tell the device_task that it should terminate. - connection._event.set() - - -def _central_shutdown(): - global _active_scanner, _connecting - _active_scanner = None - _connecting = set() - - -register_irq_handler(_central_irq, _central_shutdown) - - -# Cancel an in-progress scan. -async def _cancel_pending(): - if _active_scanner: - await _active_scanner.cancel() - - -# Start connecting to a peripheral. -# Call device.connect() rather than using method directly. -async def _connect(connection, timeout_ms): - device = connection.device - if device in _connecting: - return - - # Enable BLE and cancel in-progress scans. - ensure_active() - await _cancel_pending() - - # Allow the connected IRQ to find the device by address. - _connecting.add(device) - - # Event will be set in the connected IRQ, and then later - # re-used to notify disconnection. - connection._event = connection._event or asyncio.ThreadSafeFlag() - - try: - with DeviceTimeout(None, timeout_ms): - ble.gap_connect(device.addr_type, device.addr) - - # Wait for the connected IRQ. - await connection._event.wait() - assert connection._conn_handle is not None - - # Register connection handle -> device. - DeviceConnection._connected[connection._conn_handle] = connection - finally: - # After timeout, don't hold a reference and ignore future events. - _connecting.remove(device) - - -# Represents a single device that has been found during a scan. The scan -# iterator will return the same ScanResult instance multiple times as its data -# changes (i.e. changing RSSI or advertising data). -class ScanResult: - def __init__(self, device): - self.device = device - self.adv_data = None - self.resp_data = None - self.rssi = None - self.connectable = False - - # New scan result available, return true if it changes our state. - def _update(self, adv_type, rssi, adv_data): - updated = False - - if rssi != self.rssi: - self.rssi = rssi - updated = True - - if adv_type in (_ADV_IND, _ADV_NONCONN_IND): - if adv_data != self.adv_data: - self.adv_data = adv_data - self.connectable = adv_type == _ADV_IND - updated = True - elif adv_type == _ADV_SCAN_IND: - if adv_data != self.adv_data and self.resp_data: - updated = True - self.adv_data = adv_data - elif adv_type == _SCAN_RSP and adv_data: - if adv_data != self.resp_data: - self.resp_data = adv_data - updated = True - - return updated - - def __str__(self): - return "Scan result: {} {}".format(self.device, self.rssi) - - # Gets all the fields for the specified types. - def _decode_field(self, *adv_type): - # Advertising payloads are repeated packets of the following form: - # 1 byte data length (N + 1) - # 1 byte type (see constants below) - # N bytes type-specific data - for payload in (self.adv_data, self.resp_data): - if not payload: - continue - i = 0 - while i + 1 < len(payload): - if payload[i + 1] in adv_type: - yield payload[i + 2 : i + payload[i] + 1] - i += 1 + payload[i] - - # Returns the value of the complete (or shortened) advertised name, if available. - def name(self): - for n in self._decode_field(_ADV_TYPE_NAME, _ADV_TYPE_SHORT_NAME): - return str(n, "utf-8") if n else "" - - # Generator that enumerates the service UUIDs that are advertised. - def services(self): - for u in self._decode_field(_ADV_TYPE_UUID16_INCOMPLETE, _ADV_TYPE_UUID16_COMPLETE): - yield bluetooth.UUID(struct.unpack(" value_handle else value_handle + 2 - - super().__init__(value_handle, properties, uuid) - - if properties & _FLAG_NOTIFY: - # Fired when a notification arrives. - self._notify_event = asyncio.ThreadSafeFlag() - # Data for the most recent notification. - self._notify_queue = deque((), 1) - if properties & _FLAG_INDICATE: - # Same for indications. - self._indicate_event = asyncio.ThreadSafeFlag() - self._indicate_queue = deque((), 1) - - def __str__(self): - return "Characteristic: {} {} {} {}".format( - self._end_handle, self._value_handle, self.properties, self.uuid - ) - - def _connection(self): - return self.service.connection - - # Search for a specific descriptor by uuid. - async def descriptor(self, uuid, timeout_ms=2000): - result = None - # Make sure loop runs to completion. - async for descriptor in self.descriptors(timeout_ms): - if not result and descriptor.uuid == uuid: - # Keep first result. - result = descriptor - return result - - # Search for all services (optionally by uuid). - # Use with `async for`, e.g. - # async for descriptor in characteristic.descriptors(): - # Note: must allow the loop to run to completion. - def descriptors(self, timeout_ms=2000): - return ClientDiscover(self.connection, ClientDescriptor, self, timeout_ms) - - # For ClientDiscover - def _start_discovery(service, uuid=None): - ble.gattc_discover_characteristics( - service.connection._conn_handle, - service._start_handle, - service._end_handle, - uuid, - ) - - # Helper for notified() and indicated(). - async def _notified_indicated(self, queue, event, timeout_ms): - # Ensure that events for this connection can route to this characteristic. - self._register_with_connection() - - # If the queue is empty, then we need to wait. However, if the queue - # has a single item, we also need to do a no-op wait in order to - # clear the event flag (because the queue will become empty and - # therefore the event should be cleared). - if len(queue) <= 1: - with self._connection().timeout(timeout_ms): - await event.wait() - - # Either we started > 1 item, or the wait completed successfully, return - # the front of the queue. - return queue.popleft() - - # Wait for the next notification. - # Will return immediately if a notification has already been received. - async def notified(self, timeout_ms=None): - self._check(_FLAG_NOTIFY) - return await self._notified_indicated(self._notify_queue, self._notify_event, timeout_ms) - - def _on_notify_indicate(self, queue, event, data): - # If we've gone from empty to one item, then wake something - # blocking on `await char.notified()` (or `await char.indicated()`). - wake = len(queue) == 0 - # Append the data. By default this is a deque with max-length==1, so it - # replaces. But if capture is enabled then it will append. - queue.append(data) - if wake: - # Queue is now non-empty. If something is waiting, it will be - # worken. If something isn't waiting right now, then a future - # caller to `await char.written()` will see the queue is - # non-empty, and wait on the event if it's going to empty the - # queue. - event.set() - - # Map an incoming notify IRQ to a registered characteristic. - def _on_notify(conn_handle, value_handle, notify_data): - if characteristic := ClientCharacteristic._find(conn_handle, value_handle): - characteristic._on_notify_indicate( - characteristic._notify_queue, characteristic._notify_event, notify_data - ) - - # Wait for the next indication. - # Will return immediately if an indication has already been received. - async def indicated(self, timeout_ms=None): - self._check(_FLAG_INDICATE) - return await self._notified_indicated( - self._indicate_queue, self._indicate_event, timeout_ms - ) - - # Map an incoming indicate IRQ to a registered characteristic. - def _on_indicate(conn_handle, value_handle, indicate_data): - if characteristic := ClientCharacteristic._find(conn_handle, value_handle): - characteristic._on_notify_indicate( - characteristic._indicate_queue, characteristic._indicate_event, indicate_data - ) - - # Write to the Client Characteristic Configuration to subscribe to - # notify/indications for this characteristic. - async def subscribe(self, notify=True, indicate=False): - # Ensure that the generated notifications are dispatched in case the app - # hasn't awaited on notified/indicated yet. - self._register_with_connection() - if cccd := await self.descriptor(bluetooth.UUID(_CCCD_UUID)): - await cccd.write(struct.pack(" 0: - print("[aioble] E:", *args) - - -def log_warn(*args): - if log_level > 1: - print("[aioble] W:", *args) - - -def log_info(*args): - if log_level > 2: - print("[aioble] I:", *args) - - -class GattError(Exception): - def __init__(self, status): - self._status = status - - -def ensure_active(): - if not ble.active(): - try: - from .security import load_secrets - - load_secrets() - except: - pass - ble.active(True) - - -def config(*args, **kwargs): - ensure_active() - return ble.config(*args, **kwargs) - - -# Because different functionality is enabled by which files are available the -# different modules can register their IRQ handlers and shutdown handlers -# dynamically. -_irq_handlers = [] -_shutdown_handlers = [] - - -def register_irq_handler(irq, shutdown): - if irq: - _irq_handlers.append(irq) - if shutdown: - _shutdown_handlers.append(shutdown) - - -def stop(): - ble.active(False) - for handler in _shutdown_handlers: - handler() - - -# Dispatch IRQs to the registered sub-modules. -def ble_irq(event, data): - log_info(event, data) - - for handler in _irq_handlers: - result = handler(event, data) - if result is not None: - return result - - -# TODO: Allow this to be injected. -ble = bluetooth.BLE() -ble.irq(ble_irq) diff --git a/tmp/aioble/device.py b/tmp/aioble/device.py deleted file mode 100644 index 265d621..0000000 --- a/tmp/aioble/device.py +++ /dev/null @@ -1,295 +0,0 @@ -# MicroPython aioble module -# MIT license; Copyright (c) 2021 Jim Mussared - -from micropython import const - -import uasyncio as asyncio -import binascii - -from .core import ble, register_irq_handler, log_error - - -_IRQ_MTU_EXCHANGED = const(21) - - -# Raised by `with device.timeout()`. -class DeviceDisconnectedError(Exception): - pass - - -def _device_irq(event, data): - if event == _IRQ_MTU_EXCHANGED: - conn_handle, mtu = data - if device := DeviceConnection._connected.get(conn_handle, None): - device.mtu = mtu - if device._mtu_event: - device._mtu_event.set() - - -register_irq_handler(_device_irq, None) - - -# Context manager to allow an operation to be cancelled by timeout or device -# disconnection. Don't use this directly -- use `with connection.timeout(ms):` -# instead. -class DeviceTimeout: - def __init__(self, connection, timeout_ms): - self._connection = connection - self._timeout_ms = timeout_ms - - # We allow either (or both) connection and timeout_ms to be None. This - # allows this to be used either as a just-disconnect, just-timeout, or - # no-op. - - # This task is active while the operation is in progress. It sleeps - # until the timeout, and then cancels the working task. If the working - # task completes, __exit__ will cancel the sleep. - self._timeout_task = None - - # This is the task waiting for the actual operation to complete. - # Usually this is waiting on an event that will be set() by an IRQ - # handler. - self._task = asyncio.current_task() - - # Tell the connection that if it disconnects, it should cancel this - # operation (by cancelling self._task). - if connection: - connection._timeouts.append(self) - - async def _timeout_sleep(self): - try: - await asyncio.sleep_ms(self._timeout_ms) - except asyncio.CancelledError: - # The operation completed successfully and this timeout task was - # cancelled by __exit__. - return - - # The sleep completed, so we should trigger the timeout. Set - # self._timeout_task to None so that we can tell the difference - # between a disconnect and a timeout in __exit__. - self._timeout_task = None - self._task.cancel() - - def __enter__(self): - if self._timeout_ms: - # Schedule the timeout waiter. - self._timeout_task = asyncio.create_task(self._timeout_sleep()) - - def __exit__(self, exc_type, exc_val, exc_traceback): - # One of five things happened: - # 1 - The operation completed successfully. - # 2 - The operation timed out. - # 3 - The device disconnected. - # 4 - The operation failed for a different exception. - # 5 - The task was cancelled by something else. - - # Don't need the connection to tell us about disconnection anymore. - if self._connection: - self._connection._timeouts.remove(self) - - try: - if exc_type == asyncio.CancelledError: - # Case 2, we started a timeout and it's completed. - if self._timeout_ms and self._timeout_task is None: - raise asyncio.TimeoutError - - # Case 3, we have a disconnected device. - if self._connection and self._connection._conn_handle is None: - raise DeviceDisconnectedError - - # Case 5, something else cancelled us. - # Allow the cancellation to propagate. - return - - # Case 1 & 4. Either way, just stop the timeout task and let the - # exception (if case 4) propagate. - finally: - # In all cases, if the timeout is still running, cancel it. - if self._timeout_task: - self._timeout_task.cancel() - - -class Device: - def __init__(self, addr_type, addr): - # Public properties - self.addr_type = addr_type - self.addr = addr if len(addr) == 6 else binascii.unhexlify(addr.replace(":", "")) - self._connection = None - - def __eq__(self, rhs): - return self.addr_type == rhs.addr_type and self.addr == rhs.addr - - def __hash__(self): - return hash((self.addr_type, self.addr)) - - def __str__(self): - return "Device({}, {}{})".format( - "ADDR_PUBLIC" if self.addr_type == 0 else "ADDR_RANDOM", - self.addr_hex(), - ", CONNECTED" if self._connection else "", - ) - - def addr_hex(self): - return binascii.hexlify(self.addr, ":").decode() - - async def connect(self, timeout_ms=10000): - if self._connection: - return self._connection - - # Forward to implementation in central.py. - from .central import _connect - - await _connect(DeviceConnection(self), timeout_ms) - - # Start the device task that will clean up after disconnection. - self._connection._run_task() - return self._connection - - -class DeviceConnection: - # Global map of connection handle to active devices (for IRQ mapping). - _connected = {} - - def __init__(self, device): - self.device = device - device._connection = self - - self.encrypted = False - self.authenticated = False - self.bonded = False - self.key_size = False - self.mtu = None - - self._conn_handle = None - - # This event is fired by the IRQ both for connection and disconnection - # and controls the device_task. - self._event = None - - # If we're waiting for a pending MTU exchange. - self._mtu_event = None - - # In-progress client discovery instance (e.g. services, chars, - # descriptors) used for IRQ mapping. - self._discover = None - # Map of value handle to characteristic (so that IRQs with - # conn_handle,value_handle can route to them). See - # ClientCharacteristic._find for where this is used. - self._characteristics = {} - - self._task = None - - # DeviceTimeout instances that are currently waiting on this device - # and need to be notified if disconnection occurs. - self._timeouts = [] - - # Fired by the encryption update event. - self._pair_event = None - - # Active L2CAP channel for this device. - # TODO: Support more than one concurrent channel. - self._l2cap_channel = None - - # While connected, this tasks waits for disconnection then cleans up. - async def device_task(self): - assert self._conn_handle is not None - - # Wait for the (either central or peripheral) disconnected irq. - await self._event.wait() - - # Mark the device as disconnected. - del DeviceConnection._connected[self._conn_handle] - self._conn_handle = None - self.device._connection = None - - # Cancel any in-progress operations on this device. - for t in self._timeouts: - t._task.cancel() - - def _run_task(self): - # Event will be already created this if we initiated connection. - self._event = self._event or asyncio.ThreadSafeFlag() - - self._task = asyncio.create_task(self.device_task()) - - async def disconnect(self, timeout_ms=2000): - await self.disconnected(timeout_ms, disconnect=True) - - async def disconnected(self, timeout_ms=60000, disconnect=False): - if not self.is_connected(): - return - - # The task must have been created after successful connection. - assert self._task - - if disconnect: - try: - ble.gap_disconnect(self._conn_handle) - except OSError as e: - log_error("Disconnect", e) - - with DeviceTimeout(None, timeout_ms): - await self._task - - # Retrieve a single service matching this uuid. - async def service(self, uuid, timeout_ms=2000): - result = None - # Make sure loop runs to completion. - async for service in self.services(uuid, timeout_ms): - if not result and service.uuid == uuid: - result = service - return result - - # Search for all services (optionally by uuid). - # Use with `async for`, e.g. - # async for service in device.services(): - # Note: must allow the loop to run to completion. - # TODO: disconnection / timeout - def services(self, uuid=None, timeout_ms=2000): - from .client import ClientDiscover, ClientService - - return ClientDiscover(self, ClientService, self, timeout_ms, uuid) - - async def pair(self, *args, **kwargs): - from .security import pair - - await pair(self, *args, **kwargs) - - def is_connected(self): - return self._conn_handle is not None - - # Use with `with` to simplify disconnection and timeout handling. - def timeout(self, timeout_ms): - return DeviceTimeout(self, timeout_ms) - - async def exchange_mtu(self, mtu=None, timeout_ms=1000): - if not self.is_connected(): - raise ValueError("Not connected") - - if mtu: - ble.config(mtu=mtu) - - self._mtu_event = self._mtu_event or asyncio.ThreadSafeFlag() - ble.gattc_exchange_mtu(self._conn_handle) - with self.timeout(timeout_ms): - await self._mtu_event.wait() - return self.mtu - - # Wait for a connection on an L2CAP connection-oriented-channel. - async def l2cap_accept(self, psm, mtu, timeout_ms=None): - from .l2cap import accept - - return await accept(self, psm, mtu, timeout_ms) - - # Attempt to connect to a listening device. - async def l2cap_connect(self, psm, mtu, timeout_ms=1000): - from .l2cap import connect - - return await connect(self, psm, mtu, timeout_ms) - - # Context manager -- automatically disconnect. - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_traceback): - await self.disconnect() diff --git a/tmp/aioble/l2cap.py b/tmp/aioble/l2cap.py deleted file mode 100644 index 713c441..0000000 --- a/tmp/aioble/l2cap.py +++ /dev/null @@ -1,214 +0,0 @@ -# MicroPython aioble module -# MIT license; Copyright (c) 2021 Jim Mussared - -from micropython import const - -import uasyncio as asyncio - -from .core import ble, log_error, register_irq_handler -from .device import DeviceConnection - - -_IRQ_L2CAP_ACCEPT = const(22) -_IRQ_L2CAP_CONNECT = const(23) -_IRQ_L2CAP_DISCONNECT = const(24) -_IRQ_L2CAP_RECV = const(25) -_IRQ_L2CAP_SEND_READY = const(26) - - -# Once we start listening we're listening forever. (Limitation in NimBLE) -_listening = False - - -def _l2cap_irq(event, data): - if event not in ( - _IRQ_L2CAP_CONNECT, - _IRQ_L2CAP_DISCONNECT, - _IRQ_L2CAP_RECV, - _IRQ_L2CAP_SEND_READY, - ): - return - - # All the L2CAP events start with (conn_handle, cid, ...) - if connection := DeviceConnection._connected.get(data[0], None): - if channel := connection._l2cap_channel: - # Expect to match the cid for this conn handle (unless we're - # waiting for connection in which case channel._cid is None). - if channel._cid is not None and channel._cid != data[1]: - return - - # Update the channel object with new information. - if event == _IRQ_L2CAP_CONNECT: - _, channel._cid, _, channel.our_mtu, channel.peer_mtu = data - elif event == _IRQ_L2CAP_DISCONNECT: - _, _, psm, status = data - channel._status = status - channel._cid = None - connection._l2cap_channel = None - elif event == _IRQ_L2CAP_RECV: - channel._data_ready = True - elif event == _IRQ_L2CAP_SEND_READY: - channel._stalled = False - - # Notify channel. - channel._event.set() - - -def _l2cap_shutdown(): - global _listening - _listening = False - - -register_irq_handler(_l2cap_irq, _l2cap_shutdown) - - -# The channel was disconnected during a send/recvinto/flush. -class L2CAPDisconnectedError(Exception): - pass - - -# Failed to connect to connection (argument is status). -class L2CAPConnectionError(Exception): - pass - - -class L2CAPChannel: - def __init__(self, connection): - if not connection.is_connected(): - raise ValueError("Not connected") - - if connection._l2cap_channel: - raise ValueError("Already has channel") - connection._l2cap_channel = self - - self._connection = connection - - # Maximum size that the other side can send to us. - self.our_mtu = 0 - # Maximum size that we can send. - self.peer_mtu = 0 - - # Set back to None on disconnection. - self._cid = None - # Set during disconnection. - self._status = 0 - - # If true, must wait for _IRQ_L2CAP_SEND_READY IRQ before sending. - self._stalled = False - - # Has received a _IRQ_L2CAP_RECV since the buffer was last emptied. - self._data_ready = False - - self._event = asyncio.ThreadSafeFlag() - - def _assert_connected(self): - if self._cid is None: - raise L2CAPDisconnectedError - - async def recvinto(self, buf, timeout_ms=None): - self._assert_connected() - - # Wait until the data_ready flag is set. This flag is only ever set by - # the event and cleared by this function. - with self._connection.timeout(timeout_ms): - while not self._data_ready: - await self._event.wait() - self._assert_connected() - - self._assert_connected() - - # Extract up to len(buf) bytes from the channel buffer. - n = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, buf) - - # Check if there's still remaining data in the channel buffers. - self._data_ready = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, None) > 0 - - return n - - # Synchronously see if there's data ready. - def available(self): - self._assert_connected() - return self._data_ready - - # Waits until the channel is free and then sends buf. - # If the buffer is larger than the MTU it will be sent in chunks. - async def send(self, buf, timeout_ms=None, chunk_size=None): - self._assert_connected() - offset = 0 - chunk_size = min(self.our_mtu * 2, self.peer_mtu, chunk_size or self.peer_mtu) - mv = memoryview(buf) - while offset < len(buf): - if self._stalled: - await self.flush(timeout_ms) - # l2cap_send returns True if you can send immediately. - self._stalled = not ble.l2cap_send( - self._connection._conn_handle, - self._cid, - mv[offset : offset + chunk_size], - ) - offset += chunk_size - - async def flush(self, timeout_ms=None): - self._assert_connected() - # Wait for the _stalled flag to be cleared by the IRQ. - with self._connection.timeout(timeout_ms): - while self._stalled: - await self._event.wait() - self._assert_connected() - - async def disconnect(self, timeout_ms=1000): - if self._cid is None: - return - - # Wait for the cid to be cleared by the disconnect IRQ. - ble.l2cap_disconnect(self._connection._conn_handle, self._cid) - await self.disconnected(timeout_ms) - - async def disconnected(self, timeout_ms=1000): - with self._connection.timeout(timeout_ms): - while self._cid is not None: - await self._event.wait() - - # Context manager -- automatically disconnect. - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_traceback): - await self.disconnect() - - -# Use connection.l2cap_accept() instead of calling this directly. -async def accept(connection, psm, mtu, timeout_ms): - global _listening - - channel = L2CAPChannel(connection) - - # Start the stack listening if necessary. - if not _listening: - ble.l2cap_listen(psm, mtu) - _listening = True - - # Wait for the connect irq from the remote connection. - with connection.timeout(timeout_ms): - await channel._event.wait() - return channel - - -# Use connection.l2cap_connect() instead of calling this directly. -async def connect(connection, psm, mtu, timeout_ms): - if _listening: - raise ValueError("Can't connect while listening") - - channel = L2CAPChannel(connection) - - with connection.timeout(timeout_ms): - ble.l2cap_connect(connection._conn_handle, psm, mtu) - - # Wait for the connect irq from the remote connection. - # If the connection fails, we get a disconnect event (with status) instead. - await channel._event.wait() - - if channel._cid is not None: - return channel - else: - raise L2CAPConnectionError(channel._status) diff --git a/tmp/aioble/peripheral.py b/tmp/aioble/peripheral.py deleted file mode 100644 index 099f2c5..0000000 --- a/tmp/aioble/peripheral.py +++ /dev/null @@ -1,179 +0,0 @@ -# MicroPython aioble module -# MIT license; Copyright (c) 2021 Jim Mussared - -from micropython import const - -import bluetooth -import struct - -import uasyncio as asyncio - -from .core import ( - ensure_active, - ble, - log_info, - log_error, - log_warn, - register_irq_handler, -) -from .device import Device, DeviceConnection, DeviceTimeout - - -_IRQ_CENTRAL_CONNECT = const(1) -_IRQ_CENTRAL_DISCONNECT = const(2) - - -_ADV_TYPE_FLAGS = const(0x01) -_ADV_TYPE_NAME = const(0x09) -_ADV_TYPE_UUID16_COMPLETE = const(0x3) -_ADV_TYPE_UUID32_COMPLETE = const(0x5) -_ADV_TYPE_UUID128_COMPLETE = const(0x7) -_ADV_TYPE_UUID16_MORE = const(0x2) -_ADV_TYPE_UUID32_MORE = const(0x4) -_ADV_TYPE_UUID128_MORE = const(0x6) -_ADV_TYPE_APPEARANCE = const(0x19) -_ADV_TYPE_MANUFACTURER = const(0xFF) - -_ADV_PAYLOAD_MAX_LEN = const(31) - - -_incoming_connection = None -_connect_event = None - - -def _peripheral_irq(event, data): - global _incoming_connection - - if event == _IRQ_CENTRAL_CONNECT: - conn_handle, addr_type, addr = data - - # Create, initialise, and register the device. - device = Device(addr_type, bytes(addr)) - _incoming_connection = DeviceConnection(device) - _incoming_connection._conn_handle = conn_handle - DeviceConnection._connected[conn_handle] = _incoming_connection - - # Signal advertise() to return the connected device. - _connect_event.set() - - elif event == _IRQ_CENTRAL_DISCONNECT: - conn_handle, _, _ = data - if connection := DeviceConnection._connected.get(conn_handle, None): - # Tell the device_task that it should terminate. - connection._event.set() - - -def _peripheral_shutdown(): - global _incoming_connection, _connect_event - _incoming_connection = None - _connect_event = None - - -register_irq_handler(_peripheral_irq, _peripheral_shutdown) - - -# Advertising payloads are repeated packets of the following form: -# 1 byte data length (N + 1) -# 1 byte type (see constants below) -# N bytes type-specific data -def _append(adv_data, resp_data, adv_type, value): - data = struct.pack("BB", len(value) + 1, adv_type) + value - - if len(data) + len(adv_data) < _ADV_PAYLOAD_MAX_LEN: - adv_data += data - return resp_data - - if len(data) + (len(resp_data) if resp_data else 0) < _ADV_PAYLOAD_MAX_LEN: - if not resp_data: - # Overflow into resp_data for the first time. - resp_data = bytearray() - resp_data += data - return resp_data - - raise ValueError("Advertising payload too long") - - -async def advertise( - interval_us, - adv_data=None, - resp_data=None, - connectable=True, - limited_disc=False, - br_edr=False, - name=None, - services=None, - appearance=0, - manufacturer=None, - timeout_ms=None, -): - global _incoming_connection, _connect_event - - ensure_active() - - if not adv_data and not resp_data: - # If the user didn't manually specify adv_data / resp_data then - # construct them from the kwargs. Keep adding fields to adv_data, - # overflowing to resp_data if necessary. - # TODO: Try and do better bin-packing than just concatenating in - # order? - - adv_data = bytearray() - - resp_data = _append( - adv_data, - resp_data, - _ADV_TYPE_FLAGS, - struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), - ) - - # Services are prioritised to go in the advertising data because iOS supports - # filtering scan results by service only, so services must come first. - if services: - for uuid in services: - b = bytes(uuid) - if len(b) == 2: - resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID16_COMPLETE, b) - elif len(b) == 4: - resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID32_COMPLETE, b) - elif len(b) == 16: - resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID128_COMPLETE, b) - - if name: - resp_data = _append(adv_data, resp_data, _ADV_TYPE_NAME, name) - - if appearance: - # See org.bluetooth.characteristic.gap.appearance.xml - resp_data = _append( - adv_data, resp_data, _ADV_TYPE_APPEARANCE, struct.pack("> 2) & 0b10) - sMotD = 1 - ((motD >> 2) & 0b10) - motG = ((motG & 0b0111) * sMotG) * 100 / 7 - motD = ((motD & 0b0111) * sMotD) * 100 / 7 - print("Mot G :", motG) - print("Mot D :", motD) - alphabot.setMotors(left=motG, right=motD) - -while True: - joystickButton = alphabot.getJoystickValue() - if (joystickButton == "center"): - oled.text("Coucou !", 0, 0) - oled.show() - utime.sleep(1) - oled.fill(0) - oled.show() - if (joystickButton == "left"): - alphabot.setMotors(left=-100, right=100) - utime.sleep(1) - alphabot.stop() - if (joystickButton == "right"): - StartCPU("./out.bin", motorCallback) - alphabot.stop() - - diff --git a/tmp/neopixel.py b/tmp/neopixel.py deleted file mode 100644 index 140fa66..0000000 --- a/tmp/neopixel.py +++ /dev/null @@ -1,45 +0,0 @@ -# NeoPixel driver for MicroPython -# MIT license; Copyright (c) 2016 Damien P. George, 2021 Jim Mussared - -from machine import bitstream - -class NeoPixel: - # G R B W - ORDER = (1, 0, 2, 3) - - def __init__(self, pin, n, bpp=3, timing=1): - self.pin = pin - self.n = n - self.bpp = bpp - self.buf = bytearray(n * bpp) - self.pin.init(pin.OUT) - # Timing arg can either be 1 for 800kHz or 0 for 400kHz, - # or a user-specified timing ns tuple (high_0, low_0, high_1, low_1). - self.timing = ( - ((400, 850, 800, 450) if timing else (800, 1700, 1600, 900)) - if isinstance(timing, int) - else timing - ) - - def __len__(self): - return self.n - - def __setitem__(self, i, v): - offset = i * self.bpp - for i in range(self.bpp): - self.buf[offset + self.ORDER[i]] = v[i] - - def __getitem__(self, i): - offset = i * self.bpp - return tuple(self.buf[offset + self.ORDER[i]] for i in range(self.bpp)) - - def fill(self, v): - b = self.buf - for i in range(self.bpp): - c = v[i] - for j in range(self.ORDER[i], len(self.buf), self.bpp): - b[j] = c - - def write(self): - # BITSTREAM_TYPE_HIGH_LOW = 0 - bitstream(self.pin, 0, self.timing, self.buf) diff --git a/tmp/out.bin b/tmp/out.bin deleted file mode 100644 index c599987..0000000 --- a/tmp/out.bin +++ /dev/null @@ -1 +0,0 @@ -àðød€ \ No newline at end of file diff --git a/tmp/pybcdc.inf b/tmp/pybcdc.inf deleted file mode 100644 index a131ec7..0000000 --- a/tmp/pybcdc.inf +++ /dev/null @@ -1,92 +0,0 @@ -; Windows USB CDC ACM Setup File -; Based on INF files which were: -; Copyright (c) 2000 Microsoft Corporation -; Copyright (C) 2007 Microchip Technology Inc. -; Likely to be covered by the MLPL as found at: -; . - -[Version] -Signature="$Windows NT$" -Class=Ports -ClassGuid={4D36E978-E325-11CE-BFC1-08002BE10318} -Provider=%MFGNAME% -LayoutFile=layout.inf -DriverVer=03/11/2010,5.1.2600.3 - -[Manufacturer] -%MFGNAME%=DeviceList, NTamd64 - -[DestinationDirs] -DefaultDestDir=12 - -;--------------------------------------------------------------------- -; Windows 2000/XP/Server2003/Vista/Server2008/7 - 32bit Sections - -[DriverInstall.nt] -include=mdmcpq.inf -CopyFiles=DriverCopyFiles.nt -AddReg=DriverInstall.nt.AddReg - -[DriverCopyFiles.nt] -usbser.sys,,,0x20 - -[DriverInstall.nt.AddReg] -HKR,,DevLoader,,*ntkern -HKR,,NTMPDriver,,usbser.sys -HKR,,EnumPropPages32,,"MsPorts.dll,SerialPortPropPageProvider" - -[DriverInstall.nt.Services] -AddService=usbser, 0x00000002, DriverService.nt - -[DriverService.nt] -DisplayName=%SERVICE% -ServiceType=1 -StartType=3 -ErrorControl=1 -ServiceBinary=%12%\usbser.sys - -;--------------------------------------------------------------------- -; Windows XP/Server2003/Vista/Server2008/7 - 64bit Sections - -[DriverInstall.NTamd64] -include=mdmcpq.inf -CopyFiles=DriverCopyFiles.NTamd64 -AddReg=DriverInstall.NTamd64.AddReg - -[DriverCopyFiles.NTamd64] -usbser.sys,,,0x20 - -[DriverInstall.NTamd64.AddReg] -HKR,,DevLoader,,*ntkern -HKR,,NTMPDriver,,usbser.sys -HKR,,EnumPropPages32,,"MsPorts.dll,SerialPortPropPageProvider" - -[DriverInstall.NTamd64.Services] -AddService=usbser, 0x00000002, DriverService.NTamd64 - -[DriverService.NTamd64] -DisplayName=%SERVICE% -ServiceType=1 -StartType=3 -ErrorControl=1 -ServiceBinary=%12%\usbser.sys - -;--------------------------------------------------------------------- -; Vendor and Product ID Definitions - -[SourceDisksFiles] -[SourceDisksNames] -[DeviceList] -%DESCRIPTION%=DriverInstall, USB\VID_f055&PID_9800&MI_00, USB\VID_f055&PID_9800&MI_01, USB\VID_f055&PID_9801&MI_00, USB\VID_f055&PID_9801&MI_01, USB\VID_f055&PID_9802 - -[DeviceList.NTamd64] -%DESCRIPTION%=DriverInstall, USB\VID_f055&PID_9800&MI_00, USB\VID_f055&PID_9800&MI_01, USB\VID_f055&PID_9801&MI_00, USB\VID_f055&PID_9801&MI_01, USB\VID_f055&PID_9802 - -;--------------------------------------------------------------------- -; String Definitions - -[Strings] -MFGFILENAME="pybcdc" -MFGNAME="MicroPython" -DESCRIPTION="Pyboard USB Comm Port" -SERVICE="USB Serial Driver" diff --git a/tmp/stm32_TRsensors.py b/tmp/stm32_TRsensors.py deleted file mode 100644 index 371d250..0000000 --- a/tmp/stm32_TRsensors.py +++ /dev/null @@ -1,199 +0,0 @@ -""" -QTRSensors.h - Originally Arduino Library for using Pololu QTR reflectance sensors and reflectance sensor arrays - -MIT Licence -Copyright (c) 2008-2012 waveshare Corporation. For more information, see -https://www.waveshare.com/wiki/AlphaBot2-Ar - -Copyright (c) 2021 leomlr (Léo Meillier). For more information, see -https://github.com/vittascience/stm32-libraries -https://vittascience.com/stm32/ - -You may freely modify and share this code, as long as you keep this notice intact. - -Disclaimer: To the extent permitted by law, waveshare provides this work -without any warranty. It might be defective, in which case you agree -to be responsible for all resulting costs and damages. - -Author: Léo Meillier (leomlr) -Date: 07/2021 -Note: library adapted in micropython for using 5 QTR sensors on Alphabot v2 robot controlled by STM32 board. -""" - -import pyb -from micropython import const -import utime - -PIN_CS = 'D10' -PIN_DOUT = 'D11' -PIN_ADDR = 'D12' -PIN_CLK = 'D13' - -NUMSENSORS = const(5) - -QTR_EMITTERS_OFF = const(0x00) -QTR_EMITTERS_ON = const(0x01) -QTR_EMITTERS_ON_AND_OFF = const(0x02) - -QTR_NO_EMITTER_PIN = const(0xff) - -QTR_MAX_SENSORS = const(16) - -class TRSensors(object): - - """ Base class data member initialization (called by derived class init()). """ - def __init__(self, cs=PIN_CS, dout=PIN_DOUT, addr=PIN_ADDR, clk=PIN_CLK): - - self._cs = pyb.Pin(cs, pyb.Pin.OUT) - self._dout = pyb.Pin(dout, pyb.Pin.IN) - self._addr = pyb.Pin(addr, pyb.Pin.OUT) - self._clk = pyb.Pin(clk, pyb.Pin.OUT) - - self._numSensors = NUMSENSORS - - self.calibratedMin = [0] * self._numSensors - self.calibratedMax = [1023] * self._numSensors - self.last_value = 0 - - """ Reads the sensor values using TLC1543 ADC chip into an array. - The values returned are a measure of the reflectance in abstract units, - with higher values corresponding to lower reflectance (e.g. a black - surface or a void). """ - - def analogRead(self): - value = [0]* (self._numSensors+1) - #Read Channel0~channel4 AD value - for j in range(0, self._numSensors+1): - self._cs.off() - for i in range(0,4): - #sent 4-bit Address - if (j >> (3 - i)) & 0x01: - self._addr.on() - else: - self._addr.off() - #read MSB 4-bit data - value[j] <<= 1 - if self._dout.value(): - value[j] |= 0x01 - self._clk.on() - self._clk.off() - for i in range(0, self._numSensors+1): - #read LSB 8-bit data - value[j] <<= 1 - if self._dout.value(): - value[j] |= 0x01 - self._clk.on() - self._clk.off() - #no mean ,just delay - #for i in range(0,6): - # self._clk.on() - # self._clk.off() - utime.sleep_us(100) - self._cs.on() - return value[1:] - - """ Reads the sensors 10 times and uses the results for - calibration. The sensor values are not returned instead, the - maximum and minimum values found over time are stored internally - and used for the readCalibrated() method. """ - - def calibrate(self): - sensor_values = [] - max_sensor_values = [0]*self._numSensors - min_sensor_values = [0]*self._numSensors - - for j in range(0, 10): - sensor_values = self.analogRead() - for i in range(0, self._numSensors): - # set the max we found THIS time - if j == 0 or max_sensor_values[i] < sensor_values[i]: - max_sensor_values[i] = sensor_values[i] - # set the min we found THIS time - if j == 0 or min_sensor_values[i] > sensor_values[i]: - min_sensor_values[i] = sensor_values[i] - - # record the min and max calibration values - for i in range(0, self._numSensors): - if min_sensor_values[i] > self.calibratedMax[i]: - self.calibratedMax[i] = min_sensor_values[i] - if max_sensor_values[i] < self.calibratedMin[i]: - self.calibratedMin[i] = max_sensor_values[i] - - """ Returns values calibrated to a value between 0 and 1000, where - 0 corresponds to the minimum value read by calibrate() and 1000 - corresponds to the maximum value. Calibration values are - stored separately for each sensor, so that differences in the - sensors are accounted for automatically. """ - - def readCalibrated(self): - - # read the needed values - sensor_values = self.analogRead() - - for i in range(self._numSensors): - denominator = self.calibratedMax[i] - self.calibratedMin[i] - value = 0 - if denominator is not 0: - value = (sensor_values[i] - self.calibratedMin[i]) * 1000 / denominator - if value < 0: - value = 0 - elif value > 1000: - value = 1000 - sensor_values[i] = value - - return sensor_values - - """ Operates the same as read calibrated, but also returns an - estimated position of the robot with respect to a line. The - estimate is made using a weighted average of the sensor indices - multiplied by 1000, so that a return value of 0 indicates that - the line is directly below sensor 0, a return value of 1000 - indicates that the line is directly below sensor 1, 2000 - indicates that it's below sensor 2000, etc. Intermediate - values indicate that the line is between two sensors. The - formula is: - - 0*value0 + 1000*value1 + 2000*value2 + ... - -------------------------------------------- - value0 + value1 + value2 + ... - - By default, this function assumes a dark line (high values) - surrounded by white (low values). If your line is light on - black, set the optional second argument white_line to true. In - this case, each sensor value will be replaced by (1000-value) - before the averaging. """ - - def readLine(self, white_line = 0): - - sensor_values = self.readCalibrated() - avg = 0 - sum = 0 - on_line = 0 - for i in range(0, self._numSensors): - value = sensor_values[i] - if white_line: - value = 1000-value - # keep track of whether we see the line at all - if value > 200: - on_line = 1 - - # only average in values that are above a noise threshold - if value > 50: - avg += value * (i * 1000) # this is for the weighted total, - sum += value # this is for the denominator - - if on_line != 1: - # If it last read to the left of center, return 0. - if self.last_value < (self._numSensors - 1)*1000/2: - #print("left") - self.last_value = 0 - - # If it last read to the right of center, return the max. - else: - #print("right") - self.last_value = (self._numSensors - 1)*1000 - - else: - self.last_value = avg/sum - - return self.last_value,sensor_values; diff --git a/tmp/stm32_alphabot_v2.py b/tmp/stm32_alphabot_v2.py deleted file mode 100644 index 9513dfa..0000000 --- a/tmp/stm32_alphabot_v2.py +++ /dev/null @@ -1,265 +0,0 @@ -""" -MicroPython for AlphaBot2-Ar from Waveshare. -https://github.com/vittascience/stm32-libraries -https://www.waveshare.com/wiki/AlphaBot2-Ar - -MIT License -Copyright (c) 2021 leomlr (Léo Meillier) - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" - -__version__ = "0.0.0-auto.0" -__repo__ = "show" - -from stm32_TRsensors import TRSensors -from stm32_pcf8574 import PCF8574 -import machine -import pyb -import utime - -ALPHABOT_V2_PIN_AIN2 = 'A0' -ALPHABOT_V2_PIN_AIN1 = 'A1' -ALPHABOT_V2_PIN_BIN1 = 'A2' -ALPHABOT_V2_PIN_BIN2 = 'A3' - -ALPHABOT_V2_PIN_ECHO = 'D2' -ALPHABOT_V2_PIN_TRIG = 'D3' -ALPHABOT_V2_PIN_IR = 'D4' -ALPHABOT_V2_PIN_PWMB = 'D5' -ALPHABOT_V2_PIN_PWMA = 'D6' -ALPHABOT_V2_PIN_RGB = 'D7' - -ALPHABOT_V2_PIN_OLED_D_C = 'D8' -ALPHABOT_V2_PIN_OLED_RESET = 'D9' - -ALPHABOT_V2_PIN_TRS_CS = 'D10' -ALPHABOT_V2_PIN_TRS_DOUT = 'D11' -ALPHABOT_V2_PIN_TRS_ADDR = 'D12' -ALPHABOT_V2_PIN_TRS_CLK = 'D13' - -ALPHABOT_V2_PCF8574_I2C_ADDR = 0x20 -ALPHABOT_V2_OLED_I2C_ADDR_DC_OFF = 0x3c -ALPHABOT_V2_OLED_I2C_ADDR_DC_ON = 0x3d - -class AlphaBot_v2(object): - - def __init__(self): - self.ain1 = pyb.Pin(ALPHABOT_V2_PIN_AIN1, pyb.Pin.OUT) - self.ain2 = pyb.Pin(ALPHABOT_V2_PIN_AIN2, pyb.Pin.OUT) - self.bin1 = pyb.Pin(ALPHABOT_V2_PIN_BIN1, pyb.Pin.OUT) - self.bin2 = pyb.Pin(ALPHABOT_V2_PIN_BIN2, pyb.Pin.OUT) - - self.pin_PWMA = pyb.Pin(ALPHABOT_V2_PIN_PWMA, pyb.Pin.OUT_PP) - tim_A = pyb.Timer(1, freq=500) - self.PWMA = tim_A.channel(1, pyb.Timer.PWM, pin=self.pin_PWMA) - - self.pin_PWMB = pyb.Pin(ALPHABOT_V2_PIN_PWMB, pyb.Pin.OUT_PP) - tim_B = pyb.Timer(2, freq=500) - self.PWMB = tim_B.channel(1, pyb.Timer.PWM, pin=self.pin_PWMB) - - self.stop() - - print('[Alpha_INFO]: Motors initialised') - - self.trig = pyb.Pin(ALPHABOT_V2_PIN_TRIG, pyb.Pin.OUT) - self.echo = pyb.Pin(ALPHABOT_V2_PIN_ECHO, pyb.Pin.IN) - - self.pin_RGB = pyb.Pin(ALPHABOT_V2_PIN_RGB, pyb.Pin.OUT) - - self.tr_sensors = TRSensors( - cs = ALPHABOT_V2_PIN_TRS_CS, - dout = ALPHABOT_V2_PIN_TRS_DOUT, - addr = ALPHABOT_V2_PIN_TRS_ADDR, - clk = ALPHABOT_V2_PIN_TRS_CLK - ) - - print('[Alpha_INFO]: TR sensors initialised') - - self.i2c = machine.I2C(1) - - self.LEFT_OBSTACLE = 'L' - self.RIGHT_OBSTACLE = 'R' - self.BOTH_OBSTACLE = 'B' - self.NO_OBSTACLE = 'N' - - self.JOYSTICK_UP = 'up' - self.JOYSTICK_RIGHT = 'right' - self.JOYSTICK_LEFT = 'left' - self.JOYSTICK_DOWN = 'down' - self.JOYSTICK_CENTER = 'center' - - print('[Alpha_INFO]: IR detectors initialised (for obstacles)') - - self.pin_IR = pyb.Pin(ALPHABOT_V2_PIN_IR, pyb.Pin.IN) - - print('[Alpha_INFO]: IR receiver initialised (for remotes)') - - self.pin_oled_reset = pyb.Pin(ALPHABOT_V2_PIN_OLED_RESET, pyb.Pin.OUT) - self.pin_oled_reset.off() - utime.sleep_ms(10) - self.pin_oled_reset.on() - self.pin_DC = pyb.Pin(ALPHABOT_V2_PIN_OLED_D_C, pyb.Pin.OUT) - - print('[Alpha_INFO]: OLED screen initialised') - - self._pcf8574 = PCF8574(self.i2c, addr=ALPHABOT_V2_PCF8574_I2C_ADDR) - - def setPWMA(self, value): - self.PWMA.pulse_width_percent(value) - - def setPWMB(self, value): - self.PWMB.pulse_width_percent(value) - - def setMotors(self, left=None, right=None): - if left is not None: - if left >= 0 and left <= 100: - self.ain1.off() - self.ain2.on() - self.setPWMA(left) - elif left >= -100 and left < 0: - self.ain1.on() - self.ain2.off() - self.setPWMA(-left) - if right is not None: - if right >= 0 and right <= 100: - self.bin1.off() - self.bin2.on() - self.setPWMB(right) - elif right >= -100 and right < 0: - self.bin1.on() - self.bin2.off() - self.setPWMB(-right) - - def stop(self): - self.setMotors(left=0, right=0) - - def moveForward(self, speed, duration_ms=0): - self.setMotors(left=speed, right=speed) - if duration_ms: - utime.sleep_ms(duration_ms) - self.stop() - - def moveBackward(self, speed, duration_ms=0): - self.setMotors(left=-speed, right=-speed) - if duration_ms: - utime.sleep_ms(duration_ms) - self.stop() - - def turnLeft(self, speed, duration_ms=0): - if speed < 20: - self.setMotors(left=speed, right=50-speed) - else: - self.setMotors(left=30-speed, right=speed) - if duration_ms: - utime.sleep_ms(duration_ms) - self.stop() - - def turnRight(self, speed, duration_ms=0): - if speed < 20: - self.setMotors(left=50-speed, right=speed) - else: - self.setMotors(left=speed, right=30-speed) - if duration_ms: - utime.sleep_ms(duration_ms) - self.stop() - - def calibrateLineFinder(self): - print("[Alpha_INFO]: TR sensors calibration ...\\n") - for i in range(0, 100): - if i<25 or i>= 75: - self.turnRight(15) - else: - self.turnLeft(15) - self.TRSensors_calibrate() - self.stop() - print("Calibration done.\\n") - print(str(self.tr_sensors.calibratedMin) + '\\n') - print(str(self.tr_sensors.calibratedMax) + '\\n') - utime.sleep_ms(500) - - def TRSensors_calibrate(self): - self.tr_sensors.calibrate() - - def TRSensors_read(self, sensor = 0): - return self.tr_sensors.analogRead() - - def TRSensors_readLine(self, sensor = 0): - position, sensor_values = self.tr_sensors.readLine() - if sensor is 0: - return sensor_values - else: - return sensor_values[sensor-1] - - def TRSensors_position_readLine(self, sensor = 0): - return self.tr_sensors.readLine() - - def readUltrasonicDistance(self, length=15, timeout_us = 30000): - measurements = 0 - for i in range(length): - self.trig.off() - utime.sleep_us(2) - self.trig.on() - utime.sleep_us(10) - self.trig.off() - self.echo.value() - measurements += machine.time_pulse_us(self.echo, 1, timeout_us)/1e6 # t_echo in seconds - duration = measurements/length - return 343 * duration/2 * 100 - - def getOLEDaddr(self): - if self.pin_DC.value(): - return ALPHABOT_V2_OLED_I2C_ADDR_DC_ON - else: - return ALPHABOT_V2_OLED_I2C_ADDR_DC_OFF - - # Drivers for PCF8574T - - def controlBuzzer(self, state): - self._pcf8574.pin(5, state) - - def getJoystickValue(self): - i = 0 - for i in range(5): - if not self._pcf8574.pin(i): break - elif i == 4: i = None - if i == 0: - return self.JOYSTICK_UP - elif i == 1: - return self.JOYSTICK_RIGHT - elif i == 2: - return self.JOYSTICK_LEFT - elif i == 3: - return self.JOYSTICK_DOWN - elif i == 4: - return self.JOYSTICK_CENTER - else: - return None - - def readInfrared(self): - left = not self._pcf8574.pin(7) - right = not self._pcf8574.pin(6) - if left and not right: - return self.LEFT_OBSTACLE - elif not left and right: - return self.RIGHT_OBSTACLE - elif left and right: - return self.BOTH_OBSTACLE - else: - return self.NO_OBSTACLE diff --git a/tmp/stm32_bleAdvertising.py b/tmp/stm32_bleAdvertising.py deleted file mode 100644 index 74de473..0000000 --- a/tmp/stm32_bleAdvertising.py +++ /dev/null @@ -1,91 +0,0 @@ -# Exemple pour générer des trames d'advertising pour le BLE - -from micropython import const -import struct -import bluetooth - -# Les trames d'advertising sont sont des paquets répétés ayant la structure suivante : -# 1 octet indiquant la taille des données (N + 1) -# 1 octet indiquant le type de données (voir les constantes ci-dessous) -# N octets de données du type indiqué - -_ADV_TYPE_FLAGS = const(0x01) -_ADV_TYPE_NAME = const(0x09) -_ADV_TYPE_UUID16_COMPLETE = const(0x3) -_ADV_TYPE_UUID32_COMPLETE = const(0x5) -_ADV_TYPE_UUID128_COMPLETE = const(0x7) -_ADV_TYPE_UUID16_MORE = const(0x2) -_ADV_TYPE_UUID32_MORE = const(0x4) -_ADV_TYPE_UUID128_MORE = const(0x6) -_ADV_TYPE_APPEARANCE = const(0x19) -_ADV_TYPE_MANUFACTURER = const(0xFF) - -# Génère une trame qui sera passée à la méthode gap_advertise(adv_data=...). - - -def adv_payload( - limited_disc=False, - br_edr=False, - name=None, - services=None, - appearance=0, - manufacturer=0, -): - payload = bytearray() - - def _append(adv_type, value): - nonlocal payload - payload += struct.pack("BB", len(value) + 1, adv_type) + value - - _append( - _ADV_TYPE_FLAGS, - struct.pack("B", (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)), - ) - - if name: - _append(_ADV_TYPE_NAME, name) - - if services: - for uuid in services: - b = bytes(uuid) - if len(b) == 2: - _append(_ADV_TYPE_UUID16_COMPLETE, b) - elif len(b) == 4: - _append(_ADV_TYPE_UUID32_COMPLETE, b) - elif len(b) == 16: - _append(_ADV_TYPE_UUID128_COMPLETE, b) - - if appearance: - # Voir org.bluetooth.characteristic.gap.appearance.xml - _append(_ADV_TYPE_APPEARANCE, struct.pack("= thresh: - self.callback(cmd, addr, ext, *self.args) - else: - self._errf(cmd) - - def error_function(self, func): - self._errf = func - - def close(self): - self._pin.irq(handler = None) - self.tim.deinit() diff --git a/tmp/stm32_nec.py b/tmp/stm32_nec.py deleted file mode 100644 index a8d8b19..0000000 --- a/tmp/stm32_nec.py +++ /dev/null @@ -1,62 +0,0 @@ -# nec.py Decoder for IR remote control using synchronous code -# Supports NEC protocol. -# For a remote using NEC see https://www.adafruit.com/products/389 - -# Author: Peter Hinch -# Copyright Peter Hinch 2020 Released under the MIT license - -from utime import ticks_us, ticks_diff -from stm32_ir_receiver import IR_RX - -class NEC_ABC(IR_RX): - def __init__(self, pin, extended, callback, *args): - # Block lasts <= 80ms (extended mode) and has 68 edges - super().__init__(pin, 68, 80, callback, *args) - self._extended = extended - self._addr = 0 - - def decode(self, _): - try: - if self.edge > 68: - raise RuntimeError(self.OVERRUN) - width = ticks_diff(self._times[1], self._times[0]) - if width < 4000: # 9ms leading mark for all valid data - raise RuntimeError(self.BADSTART) - width = ticks_diff(self._times[2], self._times[1]) - if width > 3000: # 4.5ms space for normal data - if self.edge < 68: # Haven't received the correct number of edges - raise RuntimeError(self.BADBLOCK) - # Time spaces only (marks are always 562.5µs) - # Space is 1.6875ms (1) or 562.5µs (0) - # Skip last bit which is always 1 - val = 0 - for edge in range(3, 68 - 2, 2): - val >>= 1 - if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120: - val |= 0x80000000 - elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges. - raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP) # Treat REPEAT as error. - else: - raise RuntimeError(self.BADSTART) - addr = val & 0xff # 8 bit addr - cmd = (val >> 16) & 0xff - if cmd != (val >> 24) ^ 0xff: - raise RuntimeError(self.BADDATA) - if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check - if not self._extended: - raise RuntimeError(self.BADADDR) - addr |= val & 0xff00 # pass assumed 16 bit address to callback - self._addr = addr - except RuntimeError as e: - cmd = e.args[0] - addr = self._addr if cmd == self.REPEAT else 0 # REPEAT uses last address - # Set up for new data burst and run user callback - self.do_callback(cmd, addr, 0, self.REPEAT) - -class NEC_8(NEC_ABC): - def __init__(self, pin, callback, *args): - super().__init__(pin, False, callback, *args) - -class NEC_16(NEC_ABC): - def __init__(self, pin, callback, *args): - super().__init__(pin, True, callback, *args) diff --git a/tmp/stm32_pcf8574.py b/tmp/stm32_pcf8574.py deleted file mode 100644 index 065d6a8..0000000 --- a/tmp/stm32_pcf8574.py +++ /dev/null @@ -1,49 +0,0 @@ -class PCF8574: - def __init__(self, i2c, addr=0x20): - self._i2c = i2c - i2cModules = self._i2c.scan() - if addr not in i2cModules: - error = "Unable to find module 'PCF8574' at address " + str(hex(addr)) + ". Please check connections with the board.\n" - error += "[Info] I2C address.es detected: " + str([hex(a) for a in i2cModules]) - raise ValueError(error) - self._addr = addr - self._port = bytearray(1) - - @property - def port(self): - self._read() - return self._port[0] - - @port.setter - def port(self, value): - self._port[0] = value & 0xff - self._write() - - def pin(self, pin, value=None): - pin = self.validate_pin(pin) - if value is None: - self._read() - return (self._port[0] >> pin) & 1 - else: - if value: - self._port[0] |= (1 << (pin)) - else: - self._port[0] &= ~(1 << (pin)) - self._write() - - def toggle(self, pin): - pin = self.validate_pin(pin) - self._port[0] ^= (1 << (pin)) - self._write() - - def validate_pin(self, pin): - # pin valid range 0..7 - if not 0 <= pin <= 7: - raise ValueError('Invalid pin {}. Use 0-7.'.format(pin)) - return pin - - def _read(self): - self._i2c.readfrom_into(self._addr, self._port) - - def _write(self): - self._i2c.writeto(self._addr, self._port) diff --git a/tmp/stm32_ssd1306.py b/tmp/stm32_ssd1306.py deleted file mode 100644 index 96e78ac..0000000 --- a/tmp/stm32_ssd1306.py +++ /dev/null @@ -1,131 +0,0 @@ -# MicroPython SSD1306 OLED I2C driver -from micropython import const -import framebuf -import utime - -SSD1306_I2C_ADDR = 0x3C - -# register definitions -SET_CONTRAST = const(0x81) -SET_ENTIRE_ON = const(0xA4) -SET_NORM_INV = const(0xA6) -SET_DISP = const(0xAE) -SET_MEM_ADDR = const(0x20) -SET_COL_ADDR = const(0x21) -SET_PAGE_ADDR = const(0x22) -SET_DISP_START_LINE = const(0x40) -SET_SEG_REMAP = const(0xA0) -SET_MUX_RATIO = const(0xA8) -SET_COM_OUT_DIR = const(0xC0) -SET_DISP_OFFSET = const(0xD3) -SET_COM_PIN_CFG = const(0xDA) -SET_DISP_CLK_DIV = const(0xD5) -SET_PRECHARGE = const(0xD9) -SET_VCOM_DESEL = const(0xDB) -SET_CHARGE_PUMP = const(0x8D) - -# Subclassing FrameBuffer provides support for graphics primitives -# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html -class SSD1306(framebuf.FrameBuffer): - def __init__(self, width, height, external_vcc): - self.width = width - self.height = height - self.external_vcc = external_vcc - self.pages = self.height // 8 - self.buffer = bytearray(self.pages * self.width) - super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) - self.init_display() - - def init_display(self): - for cmd in ( - SET_DISP, # display off - # address setting - SET_MEM_ADDR, - 0x00, # horizontal - # resolution and layout - SET_DISP_START_LINE, # start at line 0 - SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0 - SET_MUX_RATIO, - self.height - 1, - SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0 - SET_DISP_OFFSET, - 0x00, - SET_COM_PIN_CFG, - 0x02 if self.width > 2 * self.height else 0x12, - # timing and driving scheme - SET_DISP_CLK_DIV, - 0x80, - SET_PRECHARGE, - 0x22 if self.external_vcc else 0xF1, - SET_VCOM_DESEL, - 0x30, # 0.83*Vcc - # display - SET_CONTRAST, - 0xFF, # maximum - SET_ENTIRE_ON, # output follows RAM contents - SET_NORM_INV, # not inverted - # charge pump - SET_CHARGE_PUMP, - 0x10 if self.external_vcc else 0x14, - SET_DISP | 0x01, # display on - ): # on - self.write_cmd(cmd) - self.fill(0) - self.show() - - def poweroff(self): - self.write_cmd(SET_DISP) - - def poweron(self): - self.write_cmd(SET_DISP | 0x01) - - def contrast(self, contrast): - self.write_cmd(SET_CONTRAST) - self.write_cmd(contrast) - - def invert(self, invert): - self.write_cmd(SET_NORM_INV | (invert & 1)) - - def rotate(self, rotate): - self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3)) - self.write_cmd(SET_SEG_REMAP | (rotate & 1)) - - def show(self): - x0 = 0 - x1 = self.width - 1 - if self.width == 64: - # displays with width of 64 pixels are shifted by 32 - x0 += 32 - x1 += 32 - self.write_cmd(SET_COL_ADDR) - self.write_cmd(x0) - self.write_cmd(x1) - self.write_cmd(SET_PAGE_ADDR) - self.write_cmd(0) - self.write_cmd(self.pages - 1) - self.write_data(self.buffer) - -class SSD1306_I2C(SSD1306): - def __init__(self, width, height, i2c, addr=SSD1306_I2C_ADDR, external_vcc=False): - if i2c == None: - raise ValueError("I2C object 'SSD1306' needed as argument!") - self._i2c = i2c - utime.sleep_ms(200) - i2cModules = self._i2c.scan() - if addr not in i2cModules: - error = "Unable to find module 'SSD1306' at address " + str(hex(addr)) + ". Please check connections with the board.\n" - error += "[Info] I2C address.es detected: " + str([hex(a) for a in i2cModules]) - raise ValueError(error) - self._addr = addr - self.temp = bytearray(2) - self.write_list = [b"\x40", None] # Co=0, D/C#=1 - super().__init__(width, height, external_vcc) - - def write_cmd(self, cmd): - self.temp[0] = 0x80 # Co=1, D/C#=0 - self.temp[1] = cmd - self._i2c.writeto(self._addr, self.temp) - - def write_data(self, buf): - self.write_list[1] = buf - self._i2c.writevto(self._addr, self.write_list) diff --git a/tmp/stm32_vl53l0x.py b/tmp/stm32_vl53l0x.py deleted file mode 100644 index 7878955..0000000 --- a/tmp/stm32_vl53l0x.py +++ /dev/null @@ -1,525 +0,0 @@ -""" -MicroPython for Grove Time Of Flight VL53L0X sensor (I2C). -https://github.com/vittascience/stm32-libraries -https://wiki.seeedstudio.com/Grove-Time_of_Flight_Distance_Sensor-VL53L0X/ - -MIT License -Copyright (c) 2020 leomlr (Léo Meillier) - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" - -__version__ = "0.0.0-auto.0" -__repo__ = "https://github.com/Vittascience/stm32-libraries" - -from micropython import const -import utime -import math - -_VL53L0X_IIC_ADDR = const(0x29) - -# Configuration constants: -_SYSRANGE_START = const(0x00) -_SYSTEM_THRESH_HIGH = const(0x0C) -_SYSTEM_THRESH_LOW = const(0x0E) -_SYSTEM_SEQUENCE_CONFIG = const(0x01) -_SYSTEM_RANGE_CONFIG = const(0x09) -_SYSTEM_INTERMEASUREMENT_PERIOD = const(0x04) -_SYSTEM_INTERRUPT_CONFIG_GPIO = const(0x0A) -_GPIO_HV_MUX_ACTIVE_HIGH = const(0x84) -_SYSTEM_INTERRUPT_CLEAR = const(0x0B) -_RESULT_INTERRUPT_STATUS = const(0x13) -_RESULT_RANGE_STATUS = const(0x14) -_RESULT_CORE_AMBIENT_WINDOW_EVENTS_RTN = const(0xBC) -_RESULT_CORE_RANGING_TOTAL_EVENTS_RTN = const(0xC0) -_RESULT_CORE_AMBIENT_WINDOW_EVENTS_REF = const(0xD0) -_RESULT_CORE_RANGING_TOTAL_EVENTS_REF = const(0xD4) -_RESULT_PEAK_SIGNAL_RATE_REF = const(0xB6) -_ALGO_PART_TO_PART_RANGE_OFFSET_MM = const(0x28) -_I2C_SLAVE_DEVICE_ADDRESS = const(0x8A) -_MSRC_CONFIG_CONTROL = const(0x60) -_PRE_RANGE_CONFIG_MIN_SNR = const(0x27) -_PRE_RANGE_CONFIG_VALID_PHASE_LOW = const(0x56) -_PRE_RANGE_CONFIG_VALID_PHASE_HIGH = const(0x57) -_PRE_RANGE_MIN_COUNT_RATE_RTN_LIMIT = const(0x64) -_FINAL_RANGE_CONFIG_MIN_SNR = const(0x67) -_FINAL_RANGE_CONFIG_VALID_PHASE_LOW = const(0x47) -_FINAL_RANGE_CONFIG_VALID_PHASE_HIGH = const(0x48) -_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT = const(0x44) -_PRE_RANGE_CONFIG_SIGMA_THRESH_HI = const(0x61) -_PRE_RANGE_CONFIG_SIGMA_THRESH_LO = const(0x62) -_PRE_RANGE_CONFIG_VCSEL_PERIOD = const(0x50) -_PRE_RANGE_CONFIG_TIMEOUT_MACROP_HI = const(0x51) -_PRE_RANGE_CONFIG_TIMEOUT_MACROP_LO = const(0x52) -_SYSTEM_HISTOGRAM_BIN = const(0x81) -_HISTOGRAM_CONFIG_INITIAL_PHASE_SELECT = const(0x33) -_HISTOGRAM_CONFIG_READOUT_CTRL = const(0x55) -_FINAL_RANGE_CONFIG_VCSEL_PERIOD = const(0x70) -_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI = const(0x71) -_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_LO = const(0x72) -_CROSSTALK_COMPENSATION_PEAK_RATE_MCPS = const(0x20) -_MSRC_CONFIG_TIMEOUT_MACROP = const(0x46) -_SOFT_RESET_GO2_SOFT_RESET_N = const(0xBF) -_IDENTIFICATION_MODEL_ID = const(0xC0) -_IDENTIFICATION_REVISION_ID = const(0xC2) -_OSC_CALIBRATE_VAL = const(0xF8) -_GLOBAL_CONFIG_VCSEL_WIDTH = const(0x32) -_GLOBAL_CONFIG_SPAD_ENABLES_REF_0 = const(0xB0) -_GLOBAL_CONFIG_SPAD_ENABLES_REF_1 = const(0xB1) -_GLOBAL_CONFIG_SPAD_ENABLES_REF_2 = const(0xB2) -_GLOBAL_CONFIG_SPAD_ENABLES_REF_3 = const(0xB3) -_GLOBAL_CONFIG_SPAD_ENABLES_REF_4 = const(0xB4) -_GLOBAL_CONFIG_SPAD_ENABLES_REF_5 = const(0xB5) -_GLOBAL_CONFIG_REF_EN_START_SELECT = const(0xB6) -_DYNAMIC_SPAD_NUM_REQUESTED_REF_SPAD = const(0x4E) -_DYNAMIC_SPAD_REF_EN_START_OFFSET = const(0x4F) -_POWER_MANAGEMENT_GO1_POWER_FORCE = const(0x80) -_VHV_CONFIG_PAD_SCL_SDA__EXTSUP_HV = const(0x89) -_ALGO_PHASECAL_LIM = const(0x30) -_ALGO_PHASECAL_CONFIG_TIMEOUT = const(0x30) -_VCSEL_PERIOD_PRE_RANGE = const(0) -_VCSEL_PERIOD_FINAL_RANGE = const(1) - -def _decode_timeout(val): - # format: "(LSByte * 2^MSByte) + 1" - return float(val & 0xFF) * math.pow(2.0, ((val & 0xFF00) >> 8)) + 1 - -def _encode_timeout(timeout_mclks): - # format: "(LSByte * 2^MSByte) + 1" - timeout_mclks = int(timeout_mclks) & 0xFFFF - ls_byte = 0 - ms_byte = 0 - if timeout_mclks > 0: - ls_byte = timeout_mclks - 1 - while ls_byte > 255: - ls_byte >>= 1 - ms_byte += 1 - return ((ms_byte << 8) | (ls_byte & 0xFF)) & 0xFFFF - return 0 - -def _timeout_mclks_to_microseconds(timeout_period_mclks, vcsel_period_pclks): - macro_period_ns = ((2304 * (vcsel_period_pclks) * 1655) + 500) // 1000 - return ((timeout_period_mclks * macro_period_ns) + (macro_period_ns // 2)) // 1000 - -def _timeout_microseconds_to_mclks(timeout_period_us, vcsel_period_pclks): - macro_period_ns = ((2304 * (vcsel_period_pclks) * 1655) + 500) // 1000 - return ((timeout_period_us * 1000) + (macro_period_ns // 2)) // macro_period_ns - -class VL53L0X: - """Driver for the VL53L0X distance sensor.""" - - def __init__(self, i2c, address=_VL53L0X_IIC_ADDR, io_timeout_s=0): - # pylint: disable=too-many-statements - self._i2c = i2c - self._addr = address - self.io_timeout_s = io_timeout_s - # Check identification registers for expected values. - # From section 3.2 of the datasheet. - if ( - self._read_u8(0xC0) is not 0xEE - or self._read_u8(0xC1) is not 0xAA - or self._read_u8(0xC2) is not 0x10 - ): - raise RuntimeError("Failed to find expected ID register values. Check wiring!") - # Initialize access to the sensor. This is based on the logic from: - # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp - # Set I2C standard mode. - for pair in ((0x88, 0x00), (0x80, 0x01), (0xFF, 0x01), (0x00, 0x00)): - self._write_u8(pair[0], pair[1]) - self._stop_variable = self._read_u8(0x91) - for pair in ((0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)): - self._write_u8(pair[0], pair[1]) - # disable SIGNAL_RATE_MSRC (bit 1) and SIGNAL_RATE_PRE_RANGE (bit 4) - # limit checks - config_control = self._read_u8(_MSRC_CONFIG_CONTROL) | 0x12 - self._write_u8(_MSRC_CONFIG_CONTROL, config_control) - # set final range signal rate limit to 0.25 MCPS (million counts per - # second) - self.signal_rate_limit = 0.25 - self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xFF) - spad_count, spad_is_aperture = self._get_spad_info() - # The SPAD map (RefGoodSpadMap) is read by - # VL53L0X_get_info_from_device() in the API, but the same data seems to - # be more easily readable from GLOBAL_CONFIG_SPAD_ENABLES_REF_0 through - # _6, so read it from there. - ref_spad_map = bytearray(1) - ref_spad_map[0] = _GLOBAL_CONFIG_SPAD_ENABLES_REF_0 - self._i2c.writeto(self._addr, ref_spad_map) - buf = bytearray(6) - self._i2c.readfrom_mem_into(self._addr, ref_spad_map[0], buf) - ref_spad_map.extend(buf) - - for pair in ( - (0xFF, 0x01), - (_DYNAMIC_SPAD_REF_EN_START_OFFSET, 0x00), - (_DYNAMIC_SPAD_NUM_REQUESTED_REF_SPAD, 0x2C), - (0xFF, 0x00), - (_GLOBAL_CONFIG_REF_EN_START_SELECT, 0xB4), - ): - self._write_u8(pair[0], pair[1]) - - first_spad_to_enable = 12 if spad_is_aperture else 0 - spads_enabled = 0 - for i in range(48): - if i < first_spad_to_enable or spads_enabled == spad_count: - # This bit is lower than the first one that should be enabled, - # or (reference_spad_count) bits have already been enabled, so - # zero this bit. - ref_spad_map[1 + (i // 8)] &= ~(1 << (i % 8)) - elif (ref_spad_map[1 + (i // 8)] >> (i % 8)) & 0x1 > 0: - spads_enabled += 1 - self._i2c.writeto(self._addr, ref_spad_map) - for pair in ( - (0xFF, 0x01), - (0x00, 0x00), - (0xFF, 0x00), - (0x09, 0x00), - (0x10, 0x00), - (0x11, 0x00), - (0x24, 0x01), - (0x25, 0xFF), - (0x75, 0x00), - (0xFF, 0x01), - (0x4E, 0x2C), - (0x48, 0x00), - (0x30, 0x20), - (0xFF, 0x00), - (0x30, 0x09), - (0x54, 0x00), - (0x31, 0x04), - (0x32, 0x03), - (0x40, 0x83), - (0x46, 0x25), - (0x60, 0x00), - (0x27, 0x00), - (0x50, 0x06), - (0x51, 0x00), - (0x52, 0x96), - (0x56, 0x08), - (0x57, 0x30), - (0x61, 0x00), - (0x62, 0x00), - (0x64, 0x00), - (0x65, 0x00), - (0x66, 0xA0), - (0xFF, 0x01), - (0x22, 0x32), - (0x47, 0x14), - (0x49, 0xFF), - (0x4A, 0x00), - (0xFF, 0x00), - (0x7A, 0x0A), - (0x7B, 0x00), - (0x78, 0x21), - (0xFF, 0x01), - (0x23, 0x34), - (0x42, 0x00), - (0x44, 0xFF), - (0x45, 0x26), - (0x46, 0x05), - (0x40, 0x40), - (0x0E, 0x06), - (0x20, 0x1A), - (0x43, 0x40), - (0xFF, 0x00), - (0x34, 0x03), - (0x35, 0x44), - (0xFF, 0x01), - (0x31, 0x04), - (0x4B, 0x09), - (0x4C, 0x05), - (0x4D, 0x04), - (0xFF, 0x00), - (0x44, 0x00), - (0x45, 0x20), - (0x47, 0x08), - (0x48, 0x28), - (0x67, 0x00), - (0x70, 0x04), - (0x71, 0x01), - (0x72, 0xFE), - (0x76, 0x00), - (0x77, 0x00), - (0xFF, 0x01), - (0x0D, 0x01), - (0xFF, 0x00), - (0x80, 0x01), - (0x01, 0xF8), - (0xFF, 0x01), - (0x8E, 0x01), - (0x00, 0x01), - (0xFF, 0x00), - (0x80, 0x00), - ): - self._write_u8(pair[0], pair[1]) - - self._write_u8(_SYSTEM_INTERRUPT_CONFIG_GPIO, 0x04) - gpio_hv_mux_active_high = self._read_u8(_GPIO_HV_MUX_ACTIVE_HIGH) - self._write_u8( - _GPIO_HV_MUX_ACTIVE_HIGH, gpio_hv_mux_active_high & ~0x10 - ) # active low - self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01) - self._measurement_timing_budget_us = self.measurement_timing_budget - self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xE8) - self.measurement_timing_budget = self._measurement_timing_budget_us - self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0x01) - self._perform_single_ref_calibration(0x40) - self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0x02) - self._perform_single_ref_calibration(0x00) - # "restore the previous Sequence Config" - self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xE8) - - def _read_u8(self, address): - # Read an 8-bit unsigned value from the specified 8-bit address. - buf = self._i2c.readfrom_mem(self._addr, address, 1) - return buf[0] - - def _read_u16(self, address): - # Read a 16-bit BE unsigned value from the specified 8-bit address. - buf = self._i2c.readfrom_mem(self._addr, address, 2) - return (buf[0] << 8) | buf[1] - - def _write_u8(self, address, val): - # Write an 8-bit unsigned value to the specified 8-bit address. - self._i2c.writeto(self._addr, bytearray([address & 0xFF, val & 0xFF])) - - def _write_u16(self, address, val): - # Write a 16-bit BE unsigned value to the specified 8-bit address. - self._i2c.writeto(self._addr, bytearray([address & 0xFF, (val >> 8) & 0xFF, val & 0xFF])) - - def _get_spad_info(self): - # Get reference SPAD count and type, returned as a 2-tuple of - # count and boolean is_aperture. Based on code from: - # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp - for pair in ((0x80, 0x01), (0xFF, 0x01), (0x00, 0x00), (0xFF, 0x06)): - self._write_u8(pair[0], pair[1]) - self._write_u8(0x83, self._read_u8(0x83) | 0x04) - for pair in ( - (0xFF, 0x07), - (0x81, 0x01), - (0x80, 0x01), - (0x94, 0x6B), - (0x83, 0x00), - ): - self._write_u8(pair[0], pair[1]) - start = utime.gmtime() - while self._read_u8(0x83) == 0x00: - if ( - self.io_timeout_s > 0 - and (utime.gmtime() - start) >= self.io_timeout_s - ): - raise RuntimeError("Timeout waiting for VL53L0X!") - self._write_u8(0x83, 0x01) - tmp = self._read_u8(0x92) - count = tmp & 0x7F - is_aperture = ((tmp >> 7) & 0x01) == 1 - for pair in ((0x81, 0x00), (0xFF, 0x06)): - self._write_u8(pair[0], pair[1]) - self._write_u8(0x83, self._read_u8(0x83) & ~0x04) - for pair in ((0xFF, 0x01), (0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)): - self._write_u8(pair[0], pair[1]) - return (count, is_aperture) - - def _perform_single_ref_calibration(self, vhv_init_byte): - # based on VL53L0X_perform_single_ref_calibration() from ST API. - self._write_u8(_SYSRANGE_START, 0x01 | vhv_init_byte & 0xFF) - start = utime.gmtime() - while (self._read_u8(_RESULT_INTERRUPT_STATUS) & 0x07) == 0: - if ( - self.io_timeout_s > 0 - and (utime.gmtime() - start) >= self.io_timeout_s - ): - raise RuntimeError("Timeout waiting for VL53L0X!") - self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01) - self._write_u8(_SYSRANGE_START, 0x00) - - def _get_vcsel_pulse_period(self, vcsel_period_type): - # pylint: disable=no-else-return - # Disable should be removed when refactor can be tested - if vcsel_period_type == _VCSEL_PERIOD_PRE_RANGE: - val = self._read_u8(_PRE_RANGE_CONFIG_VCSEL_PERIOD) - return (((val) + 1) & 0xFF) << 1 - elif vcsel_period_type == _VCSEL_PERIOD_FINAL_RANGE: - val = self._read_u8(_FINAL_RANGE_CONFIG_VCSEL_PERIOD) - return (((val) + 1) & 0xFF) << 1 - return 255 - - def _get_sequence_step_enables(self): - # based on VL53L0X_GetSequenceStepEnables() from ST API - sequence_config = self._read_u8(_SYSTEM_SEQUENCE_CONFIG) - tcc = (sequence_config >> 4) & 0x1 > 0 - dss = (sequence_config >> 3) & 0x1 > 0 - msrc = (sequence_config >> 2) & 0x1 > 0 - pre_range = (sequence_config >> 6) & 0x1 > 0 - final_range = (sequence_config >> 7) & 0x1 > 0 - return (tcc, dss, msrc, pre_range, final_range) - - def _get_sequence_step_timeouts(self, pre_range): - # based on get_sequence_step_timeout() from ST API but modified by - # pololu here: - # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp - pre_range_vcsel_period_pclks = self._get_vcsel_pulse_period( - _VCSEL_PERIOD_PRE_RANGE - ) - msrc_dss_tcc_mclks = (self._read_u8(_MSRC_CONFIG_TIMEOUT_MACROP) + 1) & 0xFF - msrc_dss_tcc_us = _timeout_mclks_to_microseconds( - msrc_dss_tcc_mclks, pre_range_vcsel_period_pclks - ) - pre_range_mclks = _decode_timeout( - self._read_u16(_PRE_RANGE_CONFIG_TIMEOUT_MACROP_HI) - ) - pre_range_us = _timeout_mclks_to_microseconds( - pre_range_mclks, pre_range_vcsel_period_pclks - ) - final_range_vcsel_period_pclks = self._get_vcsel_pulse_period( - _VCSEL_PERIOD_FINAL_RANGE - ) - final_range_mclks = _decode_timeout( - self._read_u16(_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI) - ) - if pre_range: - final_range_mclks -= pre_range_mclks - final_range_us = _timeout_mclks_to_microseconds( - final_range_mclks, final_range_vcsel_period_pclks - ) - return ( - msrc_dss_tcc_us, - pre_range_us, - final_range_us, - final_range_vcsel_period_pclks, - pre_range_mclks, - ) - - @property - def signal_rate_limit(self): - """The signal rate limit in mega counts per second.""" - val = self._read_u16(_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT) - # Return value converted from 16-bit 9.7 fixed point to float. - return val / (1 << 7) - - @signal_rate_limit.setter - def signal_rate_limit(self, val): - assert 0.0 <= val <= 511.99 - # Convert to 16-bit 9.7 fixed point value from a float. - val = int(val * (1 << 7)) - self._write_u16(_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT, val) - - @property - def measurement_timing_budget(self): - """The measurement timing budget in microseconds.""" - budget_us = 1910 + 960 # Start overhead + end overhead. - tcc, dss, msrc, pre_range, final_range = self._get_sequence_step_enables() - step_timeouts = self._get_sequence_step_timeouts(pre_range) - msrc_dss_tcc_us, pre_range_us, final_range_us, _, _ = step_timeouts - if tcc: - budget_us += msrc_dss_tcc_us + 590 - if dss: - budget_us += 2 * (msrc_dss_tcc_us + 690) - elif msrc: - budget_us += msrc_dss_tcc_us + 660 - if pre_range: - budget_us += pre_range_us + 660 - if final_range: - budget_us += final_range_us + 550 - self._measurement_timing_budget_us = budget_us - return budget_us - - @measurement_timing_budget.setter - def measurement_timing_budget(self, budget_us): - # pylint: disable=too-many-locals - assert budget_us >= 20000 - used_budget_us = 1320 + 960 # Start (diff from get) + end overhead - tcc, dss, msrc, pre_range, final_range = self._get_sequence_step_enables() - step_timeouts = self._get_sequence_step_timeouts(pre_range) - msrc_dss_tcc_us, pre_range_us, _ = step_timeouts[:3] - final_range_vcsel_period_pclks, pre_range_mclks = step_timeouts[3:] - if tcc: - used_budget_us += msrc_dss_tcc_us + 590 - if dss: - used_budget_us += 2 * (msrc_dss_tcc_us + 690) - elif msrc: - used_budget_us += msrc_dss_tcc_us + 660 - if pre_range: - used_budget_us += pre_range_us + 660 - if final_range: - used_budget_us += 550 - # "Note that the final range timeout is determined by the timing - # budget and the sum of all other timeouts within the sequence. - # If there is no room for the final range timeout, then an error - # will be set. Otherwise the remaining time will be applied to - # the final range." - if used_budget_us > budget_us: - raise ValueError("Requested timeout too big.") - final_range_timeout_us = budget_us - used_budget_us - final_range_timeout_mclks = _timeout_microseconds_to_mclks( - final_range_timeout_us, final_range_vcsel_period_pclks - ) - if pre_range: - final_range_timeout_mclks += pre_range_mclks - self._write_u16( - _FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI, - _encode_timeout(final_range_timeout_mclks), - ) - self._measurement_timing_budget_us = budget_us - - def getRangeMillimeters(self): - """Perform a single reading of the range for an object in front of - the sensor and return the distance in millimeters. - """ - # Adapted from readRangeSingleMillimeters & - # readRangeContinuousMillimeters in pololu code at: - # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp - for pair in ( - (0x80, 0x01), - (0xFF, 0x01), - (0x00, 0x00), - (0x91, self._stop_variable), - (0x00, 0x01), - (0xFF, 0x00), - (0x80, 0x00), - (_SYSRANGE_START, 0x01), - ): - self._write_u8(pair[0], pair[1]) - start = utime.gmtime() - while (self._read_u8(_SYSRANGE_START) & 0x01) > 0: - if (self.io_timeout_s > 0 and (utime.gmtime() - start) >= self.io_timeout_s): - raise RuntimeError("Timeout waiting for VL53L0X!") - start = utime.gmtime() - while (self._read_u8(_RESULT_INTERRUPT_STATUS) & 0x07) == 0: - if (self.io_timeout_s > 0 and (utime.gmtime() - start) >= self.io_timeout_s): - raise RuntimeError("Timeout waiting for VL53L0X!") - # assumptions: Linearity Corrective Gain is 1000 (default) - # fractional ranging is not enabled - range_mm = self._read_u16(_RESULT_RANGE_STATUS + 10) - self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01) - return range_mm - - def set_address(self, new_address): - """Set a new I2C address to the instantaited object. This is only called when using - multiple VL53L0X sensors on the same I2C bus (SDA & SCL pins). See also the - `example `_ for proper usage. - :param int new_address: The 7-bit `int` that is to be assigned to the VL53L0X sensor. - The address that is assigned should NOT be already in use by another device on the - I2C bus. - .. important:: To properly set the address to an individual VL53L0X sensor, you must - first ensure that all other VL53L0X sensors (using the default address of ``0x29``) - on the same I2C bus are in their off state by pulling the "SHDN" pins LOW. When the - "SHDN" pin is pulled HIGH again the default I2C address is ``0x29``. - """ - self._i2c.write(_I2C_SLAVE_DEVICE_ADDRESS, new_address & 0x7F)