From af122fc3eb542b7ce658d4159d57f1ff33eab512 Mon Sep 17 00:00:00 2001 From: POUDEROUX Tom Date: Sun, 22 Mar 2026 02:00:04 +0100 Subject: [PATCH] Avancement Epreuve 5 --- Interpreteur.py | 291 +++++++++++++++++++ Path.asm | 41 +++ assembleur.py | 2 +- main.py | 120 +++++++- notes.txt | 30 ++ out.bin | Bin 12 -> 48 bytes test_bin_epreuve3/cmp_reg.bin | 1 - test_bin_epreuve3/push.bin | 1 - tmp/Interpreteur.py | 293 +++++++++++++++++++ tmp/README.txt | 12 + tmp/aioble/__init__.py | 32 +++ tmp/aioble/central.py | 297 +++++++++++++++++++ tmp/aioble/client.py | 456 +++++++++++++++++++++++++++++ tmp/aioble/core.py | 78 +++++ tmp/aioble/device.py | 295 +++++++++++++++++++ tmp/aioble/l2cap.py | 214 ++++++++++++++ tmp/aioble/peripheral.py | 179 ++++++++++++ tmp/aioble/security.py | 178 ++++++++++++ tmp/aioble/server.py | 344 ++++++++++++++++++++++ tmp/boot.py | 11 + tmp/buzzer.py | 30 ++ tmp/main.py | 54 ++++ tmp/neopixel.py | 45 +++ tmp/out.bin | 1 + tmp/pybcdc.inf | 92 ++++++ tmp/stm32_TRsensors.py | 199 +++++++++++++ tmp/stm32_alphabot_v2.py | 265 +++++++++++++++++ tmp/stm32_bleAdvertising.py | 91 ++++++ tmp/stm32_ble_uart.py | 118 ++++++++ tmp/stm32_ir_receiver.py | 69 +++++ tmp/stm32_nec.py | 62 ++++ tmp/stm32_pcf8574.py | 49 ++++ tmp/stm32_ssd1306.py | 131 +++++++++ tmp/stm32_vl53l0x.py | 525 ++++++++++++++++++++++++++++++++++ 34 files changed, 4602 insertions(+), 4 deletions(-) create mode 100644 Interpreteur.py create mode 100644 Path.asm delete mode 100644 test_bin_epreuve3/cmp_reg.bin delete mode 100644 test_bin_epreuve3/push.bin create mode 100644 tmp/Interpreteur.py create mode 100644 tmp/README.txt create mode 100644 tmp/aioble/__init__.py create mode 100644 tmp/aioble/central.py create mode 100644 tmp/aioble/client.py create mode 100644 tmp/aioble/core.py create mode 100644 tmp/aioble/device.py create mode 100644 tmp/aioble/l2cap.py create mode 100644 tmp/aioble/peripheral.py create mode 100644 tmp/aioble/security.py create mode 100644 tmp/aioble/server.py create mode 100644 tmp/boot.py create mode 100644 tmp/buzzer.py create mode 100644 tmp/main.py create mode 100644 tmp/neopixel.py create mode 100644 tmp/out.bin create mode 100644 tmp/pybcdc.inf create mode 100644 tmp/stm32_TRsensors.py create mode 100644 tmp/stm32_alphabot_v2.py create mode 100644 tmp/stm32_bleAdvertising.py create mode 100644 tmp/stm32_ble_uart.py create mode 100644 tmp/stm32_ir_receiver.py create mode 100644 tmp/stm32_nec.py create mode 100644 tmp/stm32_pcf8574.py create mode 100644 tmp/stm32_ssd1306.py create mode 100644 tmp/stm32_vl53l0x.py diff --git a/Interpreteur.py b/Interpreteur.py new file mode 100644 index 0000000..3c05627 --- /dev/null +++ b/Interpreteur.py @@ -0,0 +1,291 @@ +# --------------------------------------------------------- +# Simulateur +# --------------------------------------------------------- +# - Bus données : 8 bits +# - 4 registres R0..R3 (8 bits) +# - Bus adresse : 8 bits +# - RAM : 256 octets +# - Instructions : 1 ou 2 octets +# - Cycles : 1 octet -> 1, 2 octets -> 2, LDR/STR -> 3 +# - PC démarre à 0 +# - Pile descendante, SP=255 +# --------------------------------------------------------- + +import sys, time + + +class CPU: + pc: int = 0 + sp: int = 255 + regs: list = [0, 0, 0, 0] # R0..R3 + lt: int = 0 # flag LT + eq: int = 0 # flag EQ + cycles: int = 0 + running: bool = True + after_ret: bool = False + + def __post_init__(self): + if not self.regs: + self.regs = [0, 0, 0, 0] + + +class Simulator: + def __init__(self, program: bytes): + self.ram = bytearray(256) + for i, b in enumerate(program[:256]): + self.ram[i] = b + self.cpu = CPU() + self.program_size = len(program) + self.motorCallback = None + + # ---- Getter / Setter pour interfacer le robot + + def setMotorCallback(self, callback): + self.motorCallback = callback + + + # ----------------- utilitaires mémoire / pile ----------------- + + def fetch_byte(self) -> int: + b = self.ram[self.cpu.pc] + self.cpu.pc = (self.cpu.pc + 1) & 0xFF + return b + + def push(self, value: int): + if self.cpu.sp < 0: + raise RuntimeError("STACK OVERFLOW") + self.ram[self.cpu.sp] = value & 0xFF + self.cpu.sp -= 1 + + def pop(self) -> int: + if self.cpu.sp >= 255: + return 0 + self.cpu.sp += 1 + return self.ram[self.cpu.sp] + + # ----------------- exécution d'une instruction ----------------- + + def step(self): + c = self.cpu + pc_before = c.pc + b = self.fetch_byte() + + instr = "" + size = 1 # taille en octets (1 ou 2) + extra_cycles = 0 # pour LDR/STR/TIM + + # --- instructions 2 octets à opcode fixe --- + #print(pc_before) + #print(self.program_size) + if c.after_ret: + instr = f"DB 0x{b:02X}" + + elif b == 0x00: # CALL _label + addr = self.fetch_byte() + size = 2 + instr = f"CALL {addr}" + self.push(c.pc) + c.pc = addr + + elif b == 0x40: # JMP _label + addr = self.fetch_byte() + size = 2 + instr = f"JMP {addr}" + c.pc = addr + + elif b == 0xC0: # JLT _label + addr = self.fetch_byte() + size = 2 + instr = f"JLT {addr}" + if c.lt == 1: + c.pc = addr + + elif b == 0x20: # JEQ _label + addr = self.fetch_byte() + size = 2 + instr = f"JEQ {addr}" + if c.eq == 1: + c.pc = addr + + elif b == 0x80: # RET + instr = "RET" + ret = self.pop() + if c.sp >= 255 and ret == 0: + c.after_ret = True + c.running = False + else: + c.pc = ret + + # --- PUSH / POP --- + elif (b & 0b11111100) == 0b10100000: # PUSH Rx + r = b & 0b11 + instr = f"PUSH R{r}" + self.push(c.regs[r]) + + elif (b & 0b11111100) == 0b01100000: # POP Rx + r = b & 0b11 + instr = f"POP R{r}" + c.regs[r] = self.pop() + + # --- MOV Rx valeur / SUB Rx valeur / CMP Rx valeur --- + elif (b & 0b11111100) == 0b11100000: # MOV Rx valeur + r = b & 0b11 + imm = self.fetch_byte() + size = 2 + instr = f"MOV R{r}, {imm}" + c.regs[r] = imm + + elif (b & 0b11111100) == 0b00010000: # SUB Rx valeur + r = b & 0b11 + imm = self.fetch_byte() + size = 2 + instr = f"SUB R{r}, {imm}" + c.regs[r] = (c.regs[r] - imm) & 0xFF + + elif (b & 0b11111100) == 0b10010000: # CMP Rx valeur + r = b & 0b11 + imm = self.fetch_byte() + size = 2 + instr = f"CMP R{r}, {imm}" + v = c.regs[r] + c.lt = 1 if v < imm else 0 + c.eq = 1 if v == imm else 0 + + # --- MOV / SUB / CMP registre-registre --- + elif (b & 0b11110000) == 0b01010000: # MOV Rx Ry + dst = (b >> 2) & 0b11 + src = b & 0b11 + instr = f"MOV R{dst}, R{src}" + c.regs[dst] = c.regs[src] + + elif (b & 0b11110000) == 0b11010000: # SUB Rx Ry + dst = (b >> 2) & 0b11 + src = b & 0b11 + instr = f"SUB R{dst}, R{src}" + c.regs[dst] = (c.regs[dst] - c.regs[src]) & 0xFF + + elif (b & 0b11110000) == 0b00110000: # CMP Rx Ry + dst = (b >> 2) & 0b11 + src = b & 0b11 + instr = f"CMP R{dst}, R{src}" + v1 = c.regs[dst] + v2 = c.regs[src] + c.lt = 1 if v1 < v2 else 0 + c.eq = 1 if v1 == v2 else 0 + + # --- LDR / STR (2 octets, 3 cycles) --- + elif (b & 0b11110000) == 0b10110000: # LDR Rx Ry _label + dst = (b >> 2) & 0b11 + src = b & 0b11 + addr = self.fetch_byte() + size = 2 + instr = f"LDR R{dst}, R{src}, {addr}" + eff = (addr + c.regs[src]) & 0xFF + c.regs[dst] = self.ram[eff] + extra_cycles = 1 # 2 octets -> 2 cycles +1 = 3 + + elif (b & 0b11110000) == 0b01110000: # STR Rx Ry _label + dst = (b >> 2) & 0b11 + src = b & 0b11 + addr = self.fetch_byte() + size = 2 + instr = f"STR R{dst}, R{src}, {addr}" + eff = (addr + c.regs[src]) & 0xFF + self.ram[eff] = c.regs[dst] & 0xFF + extra_cycles = 1 + + # --- OUT Rx --- + elif (b & 0b11111100) == 0b11110000: # OUT Rx + r = b & 0b11 + instr = f"OUT R{r}" + registre = c.regs[r] + print(f"[OUT] R{r} = {registre}") + if (self.motorCallback != None): + motG = (registre >> 4) & 0b1111 + motD = (registre) & 0b1111 + self.motorCallback(motG, motD) + + # --- TIM valeur --- + elif b == 0xF8: # TIM + second = self.fetch_byte() + size = 2 + m = (second >> 7) & 0x1 + v = second & 0x7F + instr = f"TIM m={m}, v={v}" + mult = 1 if m == 0 else 100 + pause_ms = mult * (v + 1) + c.cycles += pause_ms # modélisation de la pause + print(f"Sleep {pause_ms}ms...") + time.sleep(pause_ms/1000) + print("BIPBIP") + + # if pc_before >= self.program_size: + # if 32 <= b <= 126: + # instr = f"DB 0x{b:02X} ('{chr(b)}')" + # else: + # instr = f"DB 0x{b:02X}" + else: + instr = f"UNKNOWN 0x{b:02X}" + c.running = False + + + + # calcul des cycles + if (b & 0b11110000) in (0xB0, 0x70): # LDR / STR + c.cycles += 3 + cycles_added = 3 + else: + c.cycles += size + cycles_added = size + + self.report(pc_before, instr, cycles_added) + + # ----------------- rapport d'exécution ----------------- + + def report(self, pc_before: int, instr: str, cycles_added: int): + c = self.cpu + regs_str = " ".join(f"R{i}={c.regs[i]:02X}" for i in range(4)) + print(f"PC={pc_before:02X} {instr:20s} +Cycles={cycles_added:3d} Total={c.cycles}") + print(f" {regs_str} LT={c.lt} EQ={c.eq} SP={c.sp}") + print("-" * 60) + + # ----------------- boucle principale ----------------- + + def run(self, max_steps: int = 100000): + steps = 0 + while self.cpu.running and steps < max_steps: + self.step() + steps += 1 + +def motorCallback(motG, motD): + sMotG = 1 - ((motG >> 2) & 0b10) + sMotD = 1 - ((motD >> 2) & 0b10) + motG = ((motG & 0b0111) * sMotG) * 100 / 7 + motD = ((motD & 0b0111) * sMotD) * 100 / 7 + print("Mot G :", motG) + print("Mot D :", motD) + + +def StartCPU(program, callback): + sim = Simulator(program) + sim.setMotorCallback(callback) + while sim.cpu.running: + sim.run(max_steps = 1) + time.sleep(0.1) + +import time +# --------------------------------------------------------- +# LECTURE D'UN FICHIER .bin ET LANCEMENT +# --------------------------------------------------------- +if __name__ == "__main__": + # Nom du fichier binaire à exécuter + path ="" + args= sys.argv + if (len(args) > 1): + filename = args[1] + print("filename: " + filename) + with open(filename, "rb") as f: + program = f.read() + StartCPU(program, motorCallback) + else: + print("Needs *.bin as parameter") diff --git a/Path.asm b/Path.asm new file mode 100644 index 0000000..1a1b273 --- /dev/null +++ b/Path.asm @@ -0,0 +1,41 @@ +_main: + + ; Start + MOV R0 59 + OUT R0 + TIM 132 ; Demi-tour OK + + MOV R0 0 + OUT R0 + TIM 100 ; P'tite pause OK + + MOV R0 59 + OUT R0 + TIM 128 + TIM 100 ; Demi-tour pour marche arriere OK + + MOV R0 0 + OUT R0 + TIM 100 ; P'tite pause OK + + MOV R0 204 + OUT R0 + TIM 142 ; Michael Jackson OK + + MOV R0 0 + OUT R0 + TIM 100 ; P'tite pause OK + + MOV R0 179 + OUT R0 + TIM 132 ; Demi-tour + + MOV R0 0 + OUT R0 + TIM 100 ; P'tite pause + + MOV R0 204 + OUT R0 + TIM 144 ; Michael Jackson + + RET diff --git a/assembleur.py b/assembleur.py index eac6c54..818d2de 100644 --- a/assembleur.py +++ b/assembleur.py @@ -436,7 +436,7 @@ if (__name__ == "__main__"): code = assemble(path) - with open(path + ".bin", "wb") as file: + with open("out.bin", "wb") as file: file.write(bytes(code)) exit(0) diff --git a/main.py b/main.py index b7c7d35..a3e09b1 100644 --- a/main.py +++ b/main.py @@ -1 +1,119 @@ -# main.py -- put your code here! +import machine +import utime, sys +from stm32_ssd1306 import SSD1306, SSD1306_I2C +from stm32_alphabot_v2 import AlphaBot_v2 +import neopixel +import _thread +import os +import buzzer + +import binascii +import uasyncio as asyncio +import RobotBleServer +#import base64 + + +from Interpreteur import StartCPU + +motorSpeedFactor = 50 + +alphabot = AlphaBot_v2() +oled = SSD1306_I2C(128, 64, alphabot.i2c) + +oled.fill(0) +oled.show() + +def motorCallback(motG, motD): + sMotG = 1 - ((motG >> 2) & 0b10) + sMotD = 1 - ((motD >> 2) & 0b10) + motG = ((motG & 0b0111) * sMotG) * motorSpeedFactor / 7 + motD = ((motD & 0b0111) * sMotD) * motorSpeedFactor / 7 + print("Mot G :", motG) + print("Mot D :", motD) + alphabot.setMotors(left=motG, right=motD) + + +# while True: +# joystickButton = alphabot.getJoystickValue() +# if (joystickButton == "center"): +# oled.text("Coucou !", 0, 0) +# oled.show() +# utime.sleep(1) +# oled.fill(0) +# oled.show() +# if (joystickButton == "left"): +# alphabot.setMotors(left=-100, right=100) +# utime.sleep(1) +# alphabot.stop() +# if (joystickButton == "right"): +# StartCPU("./out.bin", motorCallback) +# alphabot.stop() + + + + + + + + +# to know COM port used when connected on PC: +# python -m serial.tools.list_ports + +# in this example, robot will send back to PC the checksum of each message received + + +robotName = 'Nogard' +toSend = [] + +def onMsgToRobot(data:str|bytes): + """Function to call when a message sent by PC is received + :param data: message received""" + checksum = binascii.crc32(data) + print('received', data, '=>', checksum) + #data = data.encode("ascii") + #data = base64.decodebytes(data) + print(data) + + + + StartCPU(data, motorCallback) + alphabot.stop() + + +async def robotMainTask(bleConnection): + """Main function for robot activities + :param bleConnection: object to check BLE connection and send messages""" + while True: + await asyncio.sleep(0.1) + #print('connection', bleConnection.connection) + if not bleConnection.connection: continue + if toSend == []: continue + while not toSend == []: + data = toSend.pop(0) + bleConnection.sendMessage(data) + print('sent', data) + +# Run tasks +async def main(): + print('Start main') + bleConnection = RobotBleServer.RobotBleServer(robotName=robotName, onMsgReceived=onMsgToRobot) + asyncio.create_task(robotMainTask(bleConnection)) + await bleConnection.communicationTask() + +asyncio.run(main()) + + + + + + + + + + + + + + + + diff --git a/notes.txt b/notes.txt index 1ac8d6a..23db984 100644 --- a/notes.txt +++ b/notes.txt @@ -144,3 +144,33 @@ OUT Rx ;génération binaire 111100xx avec xx = R0, R1,R2,R3 +TIM valeur ;génération binaire 11111000 mvvvvvvv + ;met dans le registre de Timer la valeur mvvvvvvv + ;le processeur se met en pause pendant multiplicateur*(vvvvvvv+1) + ; en millisecondes + ;vvvvvvv valeur de 0x0 (représente 1) à 0x7F (represente 128) + ;m=0 (multiplicateur = 1) ou m=1 (multiplicateur = 100) + + + +1000ms = 85cm (vitesse 3) +toupie 76 100ms = 360 + 90° + +42 +76 + + +{"type": "connect", "name": "Nogard"} + +{"type": "msg", "format": "base64", "string": "4Crw+FCA"} + +Demi tour +{"type": "msg", "format": "base64", "string": "4Dvw+IHgAPCA"} + +Demi tour + fuite +{"type": "msg", "format": "base64", "string": "4Dvw+ID4MuAA8Phk4Mzw+KXgAPDgAPD4ZIA="} + +Parcours +{"type": "msg", "format": "base64", "string": "4Dvw+ITgAPD4ZOA78PiA+GTgAPD4ZODM8PiO4ADw+GTgs/D4hOAA8Phk4Mzw+JCA"} + + diff --git a/out.bin b/out.bin index a223f5021a35708fc118eda8f44bf079f0cea8da..9e9b4d0bac8725a61a016ce729d4bc8e5814a72a 100644 GIT binary patch literal 48 mcmaE0{ozN;1BMSjQXT+V4L}OaKJ(#6AB43TE)Nu&&;S52>LJ_! literal 12 TcmZQzYhd7PU|?=wU}yjU3#9@R diff --git a/test_bin_epreuve3/cmp_reg.bin b/test_bin_epreuve3/cmp_reg.bin deleted file mode 100644 index 30c6288..0000000 --- a/test_bin_epreuve3/cmp_reg.bin +++ /dev/null @@ -1 +0,0 @@ -0123456789:;<=>? \ No newline at end of file diff --git a/test_bin_epreuve3/push.bin b/test_bin_epreuve3/push.bin deleted file mode 100644 index 6395729..0000000 --- a/test_bin_epreuve3/push.bin +++ /dev/null @@ -1 +0,0 @@ - ¡¢£ \ No newline at end of file diff --git a/tmp/Interpreteur.py b/tmp/Interpreteur.py new file mode 100644 index 0000000..76cf74b --- /dev/null +++ b/tmp/Interpreteur.py @@ -0,0 +1,293 @@ +# --------------------------------------------------------- +# Simulateur +# --------------------------------------------------------- +# - Bus données : 8 bits +# - 4 registres R0..R3 (8 bits) +# - Bus adresse : 8 bits +# - RAM : 256 octets +# - Instructions : 1 ou 2 octets +# - Cycles : 1 octet -> 1, 2 octets -> 2, LDR/STR -> 3 +# - PC démarre à 0 +# - Pile descendante, SP=255 +# --------------------------------------------------------- + +import sys, time + + +class CPU: + pc: int = 0 + sp: int = 255 + regs: list = [0, 0, 0, 0] # R0..R3 + lt: int = 0 # flag LT + eq: int = 0 # flag EQ + cycles: int = 0 + running: bool = True + after_ret: bool = False + + def __post_init__(self): + if not self.regs: + self.regs = [0, 0, 0, 0] + + +class Simulator: + def __init__(self, program: bytes): + self.ram = bytearray(256) + for i, b in enumerate(program[:256]): + self.ram[i] = b + self.cpu = CPU() + self.program_size = len(program) + self.motorCallback = None + + # ---- Getter / Setter pour interfacer le robot + + def setMotorCallback(self, callback): + self.motorCallback = callback + + + # ----------------- utilitaires mémoire / pile ----------------- + + def fetch_byte(self) -> int: + b = self.ram[self.cpu.pc] + self.cpu.pc = (self.cpu.pc + 1) & 0xFF + return b + + def push(self, value: int): + if self.cpu.sp < 0: + raise RuntimeError("STACK OVERFLOW") + self.ram[self.cpu.sp] = value & 0xFF + self.cpu.sp -= 1 + + def pop(self) -> int: + if self.cpu.sp >= 255: + return 0 + self.cpu.sp += 1 + return self.ram[self.cpu.sp] + + # ----------------- exécution d'une instruction ----------------- + + def step(self): + c = self.cpu + pc_before = c.pc + b = self.fetch_byte() + + instr = "" + size = 1 # taille en octets (1 ou 2) + extra_cycles = 0 # pour LDR/STR/TIM + + # --- instructions 2 octets à opcode fixe --- + #print(pc_before) + #print(self.program_size) + if c.after_ret: + instr = f"DB 0x{b:02X}" + + elif b == 0x00: # CALL _label + addr = self.fetch_byte() + size = 2 + instr = f"CALL {addr}" + self.push(c.pc) + c.pc = addr + + elif b == 0x40: # JMP _label + addr = self.fetch_byte() + size = 2 + instr = f"JMP {addr}" + c.pc = addr + + elif b == 0xC0: # JLT _label + addr = self.fetch_byte() + size = 2 + instr = f"JLT {addr}" + if c.lt == 1: + c.pc = addr + + elif b == 0x20: # JEQ _label + addr = self.fetch_byte() + size = 2 + instr = f"JEQ {addr}" + if c.eq == 1: + c.pc = addr + + elif b == 0x80: # RET + instr = "RET" + ret = self.pop() + if c.sp >= 255 and ret == 0: + c.after_ret = True + c.running = False + else: + c.pc = ret + + # --- PUSH / POP --- + elif (b & 0b11111100) == 0b10100000: # PUSH Rx + r = b & 0b11 + instr = f"PUSH R{r}" + self.push(c.regs[r]) + + elif (b & 0b11111100) == 0b01100000: # POP Rx + r = b & 0b11 + instr = f"POP R{r}" + c.regs[r] = self.pop() + + # --- MOV Rx valeur / SUB Rx valeur / CMP Rx valeur --- + elif (b & 0b11111100) == 0b11100000: # MOV Rx valeur + r = b & 0b11 + imm = self.fetch_byte() + size = 2 + instr = f"MOV R{r}, {imm}" + c.regs[r] = imm + + elif (b & 0b11111100) == 0b00010000: # SUB Rx valeur + r = b & 0b11 + imm = self.fetch_byte() + size = 2 + instr = f"SUB R{r}, {imm}" + c.regs[r] = (c.regs[r] - imm) & 0xFF + + elif (b & 0b11111100) == 0b10010000: # CMP Rx valeur + r = b & 0b11 + imm = self.fetch_byte() + size = 2 + instr = f"CMP R{r}, {imm}" + v = c.regs[r] + c.lt = 1 if v < imm else 0 + c.eq = 1 if v == imm else 0 + + # --- MOV / SUB / CMP registre-registre --- + elif (b & 0b11110000) == 0b01010000: # MOV Rx Ry + dst = (b >> 2) & 0b11 + src = b & 0b11 + instr = f"MOV R{dst}, R{src}" + c.regs[dst] = c.regs[src] + + elif (b & 0b11110000) == 0b11010000: # SUB Rx Ry + dst = (b >> 2) & 0b11 + src = b & 0b11 + instr = f"SUB R{dst}, R{src}" + c.regs[dst] = (c.regs[dst] - c.regs[src]) & 0xFF + + elif (b & 0b11110000) == 0b00110000: # CMP Rx Ry + dst = (b >> 2) & 0b11 + src = b & 0b11 + instr = f"CMP R{dst}, R{src}" + v1 = c.regs[dst] + v2 = c.regs[src] + c.lt = 1 if v1 < v2 else 0 + c.eq = 1 if v1 == v2 else 0 + + # --- LDR / STR (2 octets, 3 cycles) --- + elif (b & 0b11110000) == 0b10110000: # LDR Rx Ry _label + dst = (b >> 2) & 0b11 + src = b & 0b11 + addr = self.fetch_byte() + size = 2 + instr = f"LDR R{dst}, R{src}, {addr}" + eff = (addr + c.regs[src]) & 0xFF + c.regs[dst] = self.ram[eff] + extra_cycles = 1 # 2 octets -> 2 cycles +1 = 3 + + elif (b & 0b11110000) == 0b01110000: # STR Rx Ry _label + dst = (b >> 2) & 0b11 + src = b & 0b11 + addr = self.fetch_byte() + size = 2 + instr = f"STR R{dst}, R{src}, {addr}" + eff = (addr + c.regs[src]) & 0xFF + self.ram[eff] = c.regs[dst] & 0xFF + extra_cycles = 1 + + # --- OUT Rx --- + elif (b & 0b11111100) == 0b11110000: # OUT Rx + r = b & 0b11 + instr = f"OUT R{r}" + registre = c.regs[r] + print(f"[OUT] R{r} = {registre}") + if (self.motorCallback != None): + motG = (registre >> 4) & 0b1111 + motD = (registre) & 0b1111 + self.motorCallback(motG, motD) + + # --- TIM valeur --- + elif b == 0xF8: # TIM + second = self.fetch_byte() + size = 2 + m = (second >> 7) & 0x1 + v = second & 0x7F + instr = f"TIM m={m}, v={v}" + mult = 1 if m == 0 else 100 + pause_ms = mult * (v + 1) + c.cycles += pause_ms # modélisation de la pause + print(f"Sleep {pause_ms}ms...") + time.sleep(pause_ms/1000) + print("BIPBIP") + + # if pc_before >= self.program_size: + # if 32 <= b <= 126: + # instr = f"DB 0x{b:02X} ('{chr(b)}')" + # else: + # instr = f"DB 0x{b:02X}" + else: + instr = f"UNKNOWN 0x{b:02X}" + c.running = False + + + + # calcul des cycles + if (b & 0b11110000) in (0xB0, 0x70): # LDR / STR + c.cycles += 3 + cycles_added = 3 + else: + c.cycles += size + cycles_added = size + + self.report(pc_before, instr, cycles_added) + + # ----------------- rapport d'exécution ----------------- + + def report(self, pc_before: int, instr: str, cycles_added: int): + c = self.cpu + regs_str = " ".join(f"R{i}={c.regs[i]:02X}" for i in range(4)) + print(f"PC={pc_before:02X} {instr:20s} +Cycles={cycles_added:3d} Total={c.cycles}") + print(f" {regs_str} LT={c.lt} EQ={c.eq} SP={c.sp}") + print("-" * 60) + + # ----------------- boucle principale ----------------- + + def run(self, max_steps: int = 100000): + steps = 0 + while self.cpu.running and steps < max_steps: + self.step() + steps += 1 + + + +def motorCallback(motG, motD): + sMotG = 1 - ((motG >> 2) & 0b10) + sMotD = 1 - ((motD >> 2) & 0b10) + motG = ((motG & 0b0111) * sMotG) * 100 / 7 + motD = ((motD & 0b0111) * sMotD) * 100 / 7 + print("Mot G :", motG) + print("Mot D :", motD) + + +def StartCPU(filename, callback): + with open(filename, "rb") as f: + program = f.read() + sim = Simulator(program) + sim.setMotorCallback(callback) + while sim.cpu.running: + sim.run(max_steps = 1) + time.sleep(0.1) + +import time +# --------------------------------------------------------- +# LECTURE D'UN FICHIER .bin ET LANCEMENT +# --------------------------------------------------------- +if __name__ == "__main__": + # Nom du fichier binaire à exécuter + path ="" + args= sys.argv + if (len(args) > 1): + filename = args[1] + print("filename: " + filename) + StartCPU(filename, motorCallback) + else: + print("Needs *.bin as parameter") diff --git a/tmp/README.txt b/tmp/README.txt new file mode 100644 index 0000000..4e9dc35 --- /dev/null +++ b/tmp/README.txt @@ -0,0 +1,12 @@ +This is a MicroPython board + +You can get started right away by writing your Python code in 'main.py'. + +For a serial prompt: + - Windows: you need to go to 'Device manager', right click on the unknown device, + then update the driver software, using the 'pybcdc.inf' file found on this drive. + Then use a terminal program like Hyperterminal or putty. + - Mac OS X: use the command: screen /dev/tty.usbmodem* + - Linux: use the command: screen /dev/ttyACM0 + +For online docs please visit http://docs.micropython.org/ diff --git a/tmp/aioble/__init__.py b/tmp/aioble/__init__.py new file mode 100644 index 0000000..dde89f5 --- /dev/null +++ b/tmp/aioble/__init__.py @@ -0,0 +1,32 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +from .device import Device, DeviceDisconnectedError +from .core import log_info, log_warn, log_error, GattError, config, stop + +try: + from .peripheral import advertise +except: + log_info("Peripheral support disabled") + +try: + from .central import scan +except: + log_info("Central support disabled") + +try: + from .server import ( + Service, + Characteristic, + BufferedCharacteristic, + Descriptor, + register_services, + ) +except: + log_info("GATT server support disabled") + + +ADDR_PUBLIC = const(0) +ADDR_RANDOM = const(1) diff --git a/tmp/aioble/central.py b/tmp/aioble/central.py new file mode 100644 index 0000000..adfc972 --- /dev/null +++ b/tmp/aioble/central.py @@ -0,0 +1,297 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import bluetooth +import struct + +import uasyncio as asyncio + +from .core import ( + ensure_active, + ble, + log_info, + log_error, + log_warn, + register_irq_handler, +) +from .device import Device, DeviceConnection, DeviceTimeout + + +_IRQ_SCAN_RESULT = const(5) +_IRQ_SCAN_DONE = const(6) + +_IRQ_PERIPHERAL_CONNECT = const(7) +_IRQ_PERIPHERAL_DISCONNECT = const(8) + +_ADV_IND = const(0) +_ADV_DIRECT_IND = const(1) +_ADV_SCAN_IND = const(2) +_ADV_NONCONN_IND = const(3) +_SCAN_RSP = const(4) + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_SHORT_NAME = const(0x08) +_ADV_TYPE_UUID16_INCOMPLETE = const(0x2) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_UUID32_INCOMPLETE = const(0x4) +_ADV_TYPE_UUID32_COMPLETE = const(0x5) +_ADV_TYPE_UUID128_INCOMPLETE = const(0x6) +_ADV_TYPE_UUID128_COMPLETE = const(0x7) +_ADV_TYPE_APPEARANCE = const(0x19) +_ADV_TYPE_MANUFACTURER = const(0xFF) + + +# Keep track of the active scanner so IRQs can be delivered to it. +_active_scanner = None + + +# Set of devices that are waiting for the peripheral connect IRQ. +_connecting = set() + + +def _central_irq(event, data): + # Send results and done events to the active scanner instance. + if event == _IRQ_SCAN_RESULT: + addr_type, addr, adv_type, rssi, adv_data = data + if not _active_scanner: + return + _active_scanner._queue.append((addr_type, bytes(addr), adv_type, rssi, bytes(adv_data))) + _active_scanner._event.set() + elif event == _IRQ_SCAN_DONE: + if not _active_scanner: + return + _active_scanner._done = True + _active_scanner._event.set() + + # Peripheral connect must be in response to a pending connection, so find + # it in the pending connection set. + elif event == _IRQ_PERIPHERAL_CONNECT: + conn_handle, addr_type, addr = data + + for d in _connecting: + if d.addr_type == addr_type and d.addr == addr: + # Allow connect() to complete. + connection = d._connection + connection._conn_handle = conn_handle + connection._event.set() + break + + # Find the active device connection for this connection handle. + elif event == _IRQ_PERIPHERAL_DISCONNECT: + conn_handle, _, _ = data + if connection := DeviceConnection._connected.get(conn_handle, None): + # Tell the device_task that it should terminate. + connection._event.set() + + +def _central_shutdown(): + global _active_scanner, _connecting + _active_scanner = None + _connecting = set() + + +register_irq_handler(_central_irq, _central_shutdown) + + +# Cancel an in-progress scan. +async def _cancel_pending(): + if _active_scanner: + await _active_scanner.cancel() + + +# Start connecting to a peripheral. +# Call device.connect() rather than using method directly. +async def _connect(connection, timeout_ms): + device = connection.device + if device in _connecting: + return + + # Enable BLE and cancel in-progress scans. + ensure_active() + await _cancel_pending() + + # Allow the connected IRQ to find the device by address. + _connecting.add(device) + + # Event will be set in the connected IRQ, and then later + # re-used to notify disconnection. + connection._event = connection._event or asyncio.ThreadSafeFlag() + + try: + with DeviceTimeout(None, timeout_ms): + ble.gap_connect(device.addr_type, device.addr) + + # Wait for the connected IRQ. + await connection._event.wait() + assert connection._conn_handle is not None + + # Register connection handle -> device. + DeviceConnection._connected[connection._conn_handle] = connection + finally: + # After timeout, don't hold a reference and ignore future events. + _connecting.remove(device) + + +# Represents a single device that has been found during a scan. The scan +# iterator will return the same ScanResult instance multiple times as its data +# changes (i.e. changing RSSI or advertising data). +class ScanResult: + def __init__(self, device): + self.device = device + self.adv_data = None + self.resp_data = None + self.rssi = None + self.connectable = False + + # New scan result available, return true if it changes our state. + def _update(self, adv_type, rssi, adv_data): + updated = False + + if rssi != self.rssi: + self.rssi = rssi + updated = True + + if adv_type in (_ADV_IND, _ADV_NONCONN_IND): + if adv_data != self.adv_data: + self.adv_data = adv_data + self.connectable = adv_type == _ADV_IND + updated = True + elif adv_type == _ADV_SCAN_IND: + if adv_data != self.adv_data and self.resp_data: + updated = True + self.adv_data = adv_data + elif adv_type == _SCAN_RSP and adv_data: + if adv_data != self.resp_data: + self.resp_data = adv_data + updated = True + + return updated + + def __str__(self): + return "Scan result: {} {}".format(self.device, self.rssi) + + # Gets all the fields for the specified types. + def _decode_field(self, *adv_type): + # Advertising payloads are repeated packets of the following form: + # 1 byte data length (N + 1) + # 1 byte type (see constants below) + # N bytes type-specific data + for payload in (self.adv_data, self.resp_data): + if not payload: + continue + i = 0 + while i + 1 < len(payload): + if payload[i + 1] in adv_type: + yield payload[i + 2 : i + payload[i] + 1] + i += 1 + payload[i] + + # Returns the value of the complete (or shortened) advertised name, if available. + def name(self): + for n in self._decode_field(_ADV_TYPE_NAME, _ADV_TYPE_SHORT_NAME): + return str(n, "utf-8") if n else "" + + # Generator that enumerates the service UUIDs that are advertised. + def services(self): + for u in self._decode_field(_ADV_TYPE_UUID16_INCOMPLETE, _ADV_TYPE_UUID16_COMPLETE): + yield bluetooth.UUID(struct.unpack(" value_handle else value_handle + 2 + + super().__init__(value_handle, properties, uuid) + + if properties & _FLAG_NOTIFY: + # Fired when a notification arrives. + self._notify_event = asyncio.ThreadSafeFlag() + # Data for the most recent notification. + self._notify_queue = deque((), 1) + if properties & _FLAG_INDICATE: + # Same for indications. + self._indicate_event = asyncio.ThreadSafeFlag() + self._indicate_queue = deque((), 1) + + def __str__(self): + return "Characteristic: {} {} {} {}".format( + self._end_handle, self._value_handle, self.properties, self.uuid + ) + + def _connection(self): + return self.service.connection + + # Search for a specific descriptor by uuid. + async def descriptor(self, uuid, timeout_ms=2000): + result = None + # Make sure loop runs to completion. + async for descriptor in self.descriptors(timeout_ms): + if not result and descriptor.uuid == uuid: + # Keep first result. + result = descriptor + return result + + # Search for all services (optionally by uuid). + # Use with `async for`, e.g. + # async for descriptor in characteristic.descriptors(): + # Note: must allow the loop to run to completion. + def descriptors(self, timeout_ms=2000): + return ClientDiscover(self.connection, ClientDescriptor, self, timeout_ms) + + # For ClientDiscover + def _start_discovery(service, uuid=None): + ble.gattc_discover_characteristics( + service.connection._conn_handle, + service._start_handle, + service._end_handle, + uuid, + ) + + # Helper for notified() and indicated(). + async def _notified_indicated(self, queue, event, timeout_ms): + # Ensure that events for this connection can route to this characteristic. + self._register_with_connection() + + # If the queue is empty, then we need to wait. However, if the queue + # has a single item, we also need to do a no-op wait in order to + # clear the event flag (because the queue will become empty and + # therefore the event should be cleared). + if len(queue) <= 1: + with self._connection().timeout(timeout_ms): + await event.wait() + + # Either we started > 1 item, or the wait completed successfully, return + # the front of the queue. + return queue.popleft() + + # Wait for the next notification. + # Will return immediately if a notification has already been received. + async def notified(self, timeout_ms=None): + self._check(_FLAG_NOTIFY) + return await self._notified_indicated(self._notify_queue, self._notify_event, timeout_ms) + + def _on_notify_indicate(self, queue, event, data): + # If we've gone from empty to one item, then wake something + # blocking on `await char.notified()` (or `await char.indicated()`). + wake = len(queue) == 0 + # Append the data. By default this is a deque with max-length==1, so it + # replaces. But if capture is enabled then it will append. + queue.append(data) + if wake: + # Queue is now non-empty. If something is waiting, it will be + # worken. If something isn't waiting right now, then a future + # caller to `await char.written()` will see the queue is + # non-empty, and wait on the event if it's going to empty the + # queue. + event.set() + + # Map an incoming notify IRQ to a registered characteristic. + def _on_notify(conn_handle, value_handle, notify_data): + if characteristic := ClientCharacteristic._find(conn_handle, value_handle): + characteristic._on_notify_indicate( + characteristic._notify_queue, characteristic._notify_event, notify_data + ) + + # Wait for the next indication. + # Will return immediately if an indication has already been received. + async def indicated(self, timeout_ms=None): + self._check(_FLAG_INDICATE) + return await self._notified_indicated( + self._indicate_queue, self._indicate_event, timeout_ms + ) + + # Map an incoming indicate IRQ to a registered characteristic. + def _on_indicate(conn_handle, value_handle, indicate_data): + if characteristic := ClientCharacteristic._find(conn_handle, value_handle): + characteristic._on_notify_indicate( + characteristic._indicate_queue, characteristic._indicate_event, indicate_data + ) + + # Write to the Client Characteristic Configuration to subscribe to + # notify/indications for this characteristic. + async def subscribe(self, notify=True, indicate=False): + # Ensure that the generated notifications are dispatched in case the app + # hasn't awaited on notified/indicated yet. + self._register_with_connection() + if cccd := await self.descriptor(bluetooth.UUID(_CCCD_UUID)): + await cccd.write(struct.pack(" 0: + print("[aioble] E:", *args) + + +def log_warn(*args): + if log_level > 1: + print("[aioble] W:", *args) + + +def log_info(*args): + if log_level > 2: + print("[aioble] I:", *args) + + +class GattError(Exception): + def __init__(self, status): + self._status = status + + +def ensure_active(): + if not ble.active(): + try: + from .security import load_secrets + + load_secrets() + except: + pass + ble.active(True) + + +def config(*args, **kwargs): + ensure_active() + return ble.config(*args, **kwargs) + + +# Because different functionality is enabled by which files are available the +# different modules can register their IRQ handlers and shutdown handlers +# dynamically. +_irq_handlers = [] +_shutdown_handlers = [] + + +def register_irq_handler(irq, shutdown): + if irq: + _irq_handlers.append(irq) + if shutdown: + _shutdown_handlers.append(shutdown) + + +def stop(): + ble.active(False) + for handler in _shutdown_handlers: + handler() + + +# Dispatch IRQs to the registered sub-modules. +def ble_irq(event, data): + log_info(event, data) + + for handler in _irq_handlers: + result = handler(event, data) + if result is not None: + return result + + +# TODO: Allow this to be injected. +ble = bluetooth.BLE() +ble.irq(ble_irq) diff --git a/tmp/aioble/device.py b/tmp/aioble/device.py new file mode 100644 index 0000000..265d621 --- /dev/null +++ b/tmp/aioble/device.py @@ -0,0 +1,295 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import uasyncio as asyncio +import binascii + +from .core import ble, register_irq_handler, log_error + + +_IRQ_MTU_EXCHANGED = const(21) + + +# Raised by `with device.timeout()`. +class DeviceDisconnectedError(Exception): + pass + + +def _device_irq(event, data): + if event == _IRQ_MTU_EXCHANGED: + conn_handle, mtu = data + if device := DeviceConnection._connected.get(conn_handle, None): + device.mtu = mtu + if device._mtu_event: + device._mtu_event.set() + + +register_irq_handler(_device_irq, None) + + +# Context manager to allow an operation to be cancelled by timeout or device +# disconnection. Don't use this directly -- use `with connection.timeout(ms):` +# instead. +class DeviceTimeout: + def __init__(self, connection, timeout_ms): + self._connection = connection + self._timeout_ms = timeout_ms + + # We allow either (or both) connection and timeout_ms to be None. This + # allows this to be used either as a just-disconnect, just-timeout, or + # no-op. + + # This task is active while the operation is in progress. It sleeps + # until the timeout, and then cancels the working task. If the working + # task completes, __exit__ will cancel the sleep. + self._timeout_task = None + + # This is the task waiting for the actual operation to complete. + # Usually this is waiting on an event that will be set() by an IRQ + # handler. + self._task = asyncio.current_task() + + # Tell the connection that if it disconnects, it should cancel this + # operation (by cancelling self._task). + if connection: + connection._timeouts.append(self) + + async def _timeout_sleep(self): + try: + await asyncio.sleep_ms(self._timeout_ms) + except asyncio.CancelledError: + # The operation completed successfully and this timeout task was + # cancelled by __exit__. + return + + # The sleep completed, so we should trigger the timeout. Set + # self._timeout_task to None so that we can tell the difference + # between a disconnect and a timeout in __exit__. + self._timeout_task = None + self._task.cancel() + + def __enter__(self): + if self._timeout_ms: + # Schedule the timeout waiter. + self._timeout_task = asyncio.create_task(self._timeout_sleep()) + + def __exit__(self, exc_type, exc_val, exc_traceback): + # One of five things happened: + # 1 - The operation completed successfully. + # 2 - The operation timed out. + # 3 - The device disconnected. + # 4 - The operation failed for a different exception. + # 5 - The task was cancelled by something else. + + # Don't need the connection to tell us about disconnection anymore. + if self._connection: + self._connection._timeouts.remove(self) + + try: + if exc_type == asyncio.CancelledError: + # Case 2, we started a timeout and it's completed. + if self._timeout_ms and self._timeout_task is None: + raise asyncio.TimeoutError + + # Case 3, we have a disconnected device. + if self._connection and self._connection._conn_handle is None: + raise DeviceDisconnectedError + + # Case 5, something else cancelled us. + # Allow the cancellation to propagate. + return + + # Case 1 & 4. Either way, just stop the timeout task and let the + # exception (if case 4) propagate. + finally: + # In all cases, if the timeout is still running, cancel it. + if self._timeout_task: + self._timeout_task.cancel() + + +class Device: + def __init__(self, addr_type, addr): + # Public properties + self.addr_type = addr_type + self.addr = addr if len(addr) == 6 else binascii.unhexlify(addr.replace(":", "")) + self._connection = None + + def __eq__(self, rhs): + return self.addr_type == rhs.addr_type and self.addr == rhs.addr + + def __hash__(self): + return hash((self.addr_type, self.addr)) + + def __str__(self): + return "Device({}, {}{})".format( + "ADDR_PUBLIC" if self.addr_type == 0 else "ADDR_RANDOM", + self.addr_hex(), + ", CONNECTED" if self._connection else "", + ) + + def addr_hex(self): + return binascii.hexlify(self.addr, ":").decode() + + async def connect(self, timeout_ms=10000): + if self._connection: + return self._connection + + # Forward to implementation in central.py. + from .central import _connect + + await _connect(DeviceConnection(self), timeout_ms) + + # Start the device task that will clean up after disconnection. + self._connection._run_task() + return self._connection + + +class DeviceConnection: + # Global map of connection handle to active devices (for IRQ mapping). + _connected = {} + + def __init__(self, device): + self.device = device + device._connection = self + + self.encrypted = False + self.authenticated = False + self.bonded = False + self.key_size = False + self.mtu = None + + self._conn_handle = None + + # This event is fired by the IRQ both for connection and disconnection + # and controls the device_task. + self._event = None + + # If we're waiting for a pending MTU exchange. + self._mtu_event = None + + # In-progress client discovery instance (e.g. services, chars, + # descriptors) used for IRQ mapping. + self._discover = None + # Map of value handle to characteristic (so that IRQs with + # conn_handle,value_handle can route to them). See + # ClientCharacteristic._find for where this is used. + self._characteristics = {} + + self._task = None + + # DeviceTimeout instances that are currently waiting on this device + # and need to be notified if disconnection occurs. + self._timeouts = [] + + # Fired by the encryption update event. + self._pair_event = None + + # Active L2CAP channel for this device. + # TODO: Support more than one concurrent channel. + self._l2cap_channel = None + + # While connected, this tasks waits for disconnection then cleans up. + async def device_task(self): + assert self._conn_handle is not None + + # Wait for the (either central or peripheral) disconnected irq. + await self._event.wait() + + # Mark the device as disconnected. + del DeviceConnection._connected[self._conn_handle] + self._conn_handle = None + self.device._connection = None + + # Cancel any in-progress operations on this device. + for t in self._timeouts: + t._task.cancel() + + def _run_task(self): + # Event will be already created this if we initiated connection. + self._event = self._event or asyncio.ThreadSafeFlag() + + self._task = asyncio.create_task(self.device_task()) + + async def disconnect(self, timeout_ms=2000): + await self.disconnected(timeout_ms, disconnect=True) + + async def disconnected(self, timeout_ms=60000, disconnect=False): + if not self.is_connected(): + return + + # The task must have been created after successful connection. + assert self._task + + if disconnect: + try: + ble.gap_disconnect(self._conn_handle) + except OSError as e: + log_error("Disconnect", e) + + with DeviceTimeout(None, timeout_ms): + await self._task + + # Retrieve a single service matching this uuid. + async def service(self, uuid, timeout_ms=2000): + result = None + # Make sure loop runs to completion. + async for service in self.services(uuid, timeout_ms): + if not result and service.uuid == uuid: + result = service + return result + + # Search for all services (optionally by uuid). + # Use with `async for`, e.g. + # async for service in device.services(): + # Note: must allow the loop to run to completion. + # TODO: disconnection / timeout + def services(self, uuid=None, timeout_ms=2000): + from .client import ClientDiscover, ClientService + + return ClientDiscover(self, ClientService, self, timeout_ms, uuid) + + async def pair(self, *args, **kwargs): + from .security import pair + + await pair(self, *args, **kwargs) + + def is_connected(self): + return self._conn_handle is not None + + # Use with `with` to simplify disconnection and timeout handling. + def timeout(self, timeout_ms): + return DeviceTimeout(self, timeout_ms) + + async def exchange_mtu(self, mtu=None, timeout_ms=1000): + if not self.is_connected(): + raise ValueError("Not connected") + + if mtu: + ble.config(mtu=mtu) + + self._mtu_event = self._mtu_event or asyncio.ThreadSafeFlag() + ble.gattc_exchange_mtu(self._conn_handle) + with self.timeout(timeout_ms): + await self._mtu_event.wait() + return self.mtu + + # Wait for a connection on an L2CAP connection-oriented-channel. + async def l2cap_accept(self, psm, mtu, timeout_ms=None): + from .l2cap import accept + + return await accept(self, psm, mtu, timeout_ms) + + # Attempt to connect to a listening device. + async def l2cap_connect(self, psm, mtu, timeout_ms=1000): + from .l2cap import connect + + return await connect(self, psm, mtu, timeout_ms) + + # Context manager -- automatically disconnect. + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_traceback): + await self.disconnect() diff --git a/tmp/aioble/l2cap.py b/tmp/aioble/l2cap.py new file mode 100644 index 0000000..713c441 --- /dev/null +++ b/tmp/aioble/l2cap.py @@ -0,0 +1,214 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import uasyncio as asyncio + +from .core import ble, log_error, register_irq_handler +from .device import DeviceConnection + + +_IRQ_L2CAP_ACCEPT = const(22) +_IRQ_L2CAP_CONNECT = const(23) +_IRQ_L2CAP_DISCONNECT = const(24) +_IRQ_L2CAP_RECV = const(25) +_IRQ_L2CAP_SEND_READY = const(26) + + +# Once we start listening we're listening forever. (Limitation in NimBLE) +_listening = False + + +def _l2cap_irq(event, data): + if event not in ( + _IRQ_L2CAP_CONNECT, + _IRQ_L2CAP_DISCONNECT, + _IRQ_L2CAP_RECV, + _IRQ_L2CAP_SEND_READY, + ): + return + + # All the L2CAP events start with (conn_handle, cid, ...) + if connection := DeviceConnection._connected.get(data[0], None): + if channel := connection._l2cap_channel: + # Expect to match the cid for this conn handle (unless we're + # waiting for connection in which case channel._cid is None). + if channel._cid is not None and channel._cid != data[1]: + return + + # Update the channel object with new information. + if event == _IRQ_L2CAP_CONNECT: + _, channel._cid, _, channel.our_mtu, channel.peer_mtu = data + elif event == _IRQ_L2CAP_DISCONNECT: + _, _, psm, status = data + channel._status = status + channel._cid = None + connection._l2cap_channel = None + elif event == _IRQ_L2CAP_RECV: + channel._data_ready = True + elif event == _IRQ_L2CAP_SEND_READY: + channel._stalled = False + + # Notify channel. + channel._event.set() + + +def _l2cap_shutdown(): + global _listening + _listening = False + + +register_irq_handler(_l2cap_irq, _l2cap_shutdown) + + +# The channel was disconnected during a send/recvinto/flush. +class L2CAPDisconnectedError(Exception): + pass + + +# Failed to connect to connection (argument is status). +class L2CAPConnectionError(Exception): + pass + + +class L2CAPChannel: + def __init__(self, connection): + if not connection.is_connected(): + raise ValueError("Not connected") + + if connection._l2cap_channel: + raise ValueError("Already has channel") + connection._l2cap_channel = self + + self._connection = connection + + # Maximum size that the other side can send to us. + self.our_mtu = 0 + # Maximum size that we can send. + self.peer_mtu = 0 + + # Set back to None on disconnection. + self._cid = None + # Set during disconnection. + self._status = 0 + + # If true, must wait for _IRQ_L2CAP_SEND_READY IRQ before sending. + self._stalled = False + + # Has received a _IRQ_L2CAP_RECV since the buffer was last emptied. + self._data_ready = False + + self._event = asyncio.ThreadSafeFlag() + + def _assert_connected(self): + if self._cid is None: + raise L2CAPDisconnectedError + + async def recvinto(self, buf, timeout_ms=None): + self._assert_connected() + + # Wait until the data_ready flag is set. This flag is only ever set by + # the event and cleared by this function. + with self._connection.timeout(timeout_ms): + while not self._data_ready: + await self._event.wait() + self._assert_connected() + + self._assert_connected() + + # Extract up to len(buf) bytes from the channel buffer. + n = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, buf) + + # Check if there's still remaining data in the channel buffers. + self._data_ready = ble.l2cap_recvinto(self._connection._conn_handle, self._cid, None) > 0 + + return n + + # Synchronously see if there's data ready. + def available(self): + self._assert_connected() + return self._data_ready + + # Waits until the channel is free and then sends buf. + # If the buffer is larger than the MTU it will be sent in chunks. + async def send(self, buf, timeout_ms=None, chunk_size=None): + self._assert_connected() + offset = 0 + chunk_size = min(self.our_mtu * 2, self.peer_mtu, chunk_size or self.peer_mtu) + mv = memoryview(buf) + while offset < len(buf): + if self._stalled: + await self.flush(timeout_ms) + # l2cap_send returns True if you can send immediately. + self._stalled = not ble.l2cap_send( + self._connection._conn_handle, + self._cid, + mv[offset : offset + chunk_size], + ) + offset += chunk_size + + async def flush(self, timeout_ms=None): + self._assert_connected() + # Wait for the _stalled flag to be cleared by the IRQ. + with self._connection.timeout(timeout_ms): + while self._stalled: + await self._event.wait() + self._assert_connected() + + async def disconnect(self, timeout_ms=1000): + if self._cid is None: + return + + # Wait for the cid to be cleared by the disconnect IRQ. + ble.l2cap_disconnect(self._connection._conn_handle, self._cid) + await self.disconnected(timeout_ms) + + async def disconnected(self, timeout_ms=1000): + with self._connection.timeout(timeout_ms): + while self._cid is not None: + await self._event.wait() + + # Context manager -- automatically disconnect. + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_traceback): + await self.disconnect() + + +# Use connection.l2cap_accept() instead of calling this directly. +async def accept(connection, psm, mtu, timeout_ms): + global _listening + + channel = L2CAPChannel(connection) + + # Start the stack listening if necessary. + if not _listening: + ble.l2cap_listen(psm, mtu) + _listening = True + + # Wait for the connect irq from the remote connection. + with connection.timeout(timeout_ms): + await channel._event.wait() + return channel + + +# Use connection.l2cap_connect() instead of calling this directly. +async def connect(connection, psm, mtu, timeout_ms): + if _listening: + raise ValueError("Can't connect while listening") + + channel = L2CAPChannel(connection) + + with connection.timeout(timeout_ms): + ble.l2cap_connect(connection._conn_handle, psm, mtu) + + # Wait for the connect irq from the remote connection. + # If the connection fails, we get a disconnect event (with status) instead. + await channel._event.wait() + + if channel._cid is not None: + return channel + else: + raise L2CAPConnectionError(channel._status) diff --git a/tmp/aioble/peripheral.py b/tmp/aioble/peripheral.py new file mode 100644 index 0000000..099f2c5 --- /dev/null +++ b/tmp/aioble/peripheral.py @@ -0,0 +1,179 @@ +# MicroPython aioble module +# MIT license; Copyright (c) 2021 Jim Mussared + +from micropython import const + +import bluetooth +import struct + +import uasyncio as asyncio + +from .core import ( + ensure_active, + ble, + log_info, + log_error, + log_warn, + register_irq_handler, +) +from .device import Device, DeviceConnection, DeviceTimeout + + +_IRQ_CENTRAL_CONNECT = const(1) +_IRQ_CENTRAL_DISCONNECT = const(2) + + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_UUID32_COMPLETE = const(0x5) +_ADV_TYPE_UUID128_COMPLETE = const(0x7) +_ADV_TYPE_UUID16_MORE = const(0x2) +_ADV_TYPE_UUID32_MORE = const(0x4) +_ADV_TYPE_UUID128_MORE = const(0x6) +_ADV_TYPE_APPEARANCE = const(0x19) +_ADV_TYPE_MANUFACTURER = const(0xFF) + +_ADV_PAYLOAD_MAX_LEN = const(31) + + +_incoming_connection = None +_connect_event = None + + +def _peripheral_irq(event, data): + global _incoming_connection + + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, addr_type, addr = data + + # Create, initialise, and register the device. + device = Device(addr_type, bytes(addr)) + _incoming_connection = DeviceConnection(device) + _incoming_connection._conn_handle = conn_handle + DeviceConnection._connected[conn_handle] = _incoming_connection + + # Signal advertise() to return the connected device. + _connect_event.set() + + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _ = data + if connection := DeviceConnection._connected.get(conn_handle, None): + # Tell the device_task that it should terminate. + connection._event.set() + + +def _peripheral_shutdown(): + global _incoming_connection, _connect_event + _incoming_connection = None + _connect_event = None + + +register_irq_handler(_peripheral_irq, _peripheral_shutdown) + + +# Advertising payloads are repeated packets of the following form: +# 1 byte data length (N + 1) +# 1 byte type (see constants below) +# N bytes type-specific data +def _append(adv_data, resp_data, adv_type, value): + data = struct.pack("BB", len(value) + 1, adv_type) + value + + if len(data) + len(adv_data) < _ADV_PAYLOAD_MAX_LEN: + adv_data += data + return resp_data + + if len(data) + (len(resp_data) if resp_data else 0) < _ADV_PAYLOAD_MAX_LEN: + if not resp_data: + # Overflow into resp_data for the first time. + resp_data = bytearray() + resp_data += data + return resp_data + + raise ValueError("Advertising payload too long") + + +async def advertise( + interval_us, + adv_data=None, + resp_data=None, + connectable=True, + limited_disc=False, + br_edr=False, + name=None, + services=None, + appearance=0, + manufacturer=None, + timeout_ms=None, +): + global _incoming_connection, _connect_event + + ensure_active() + + if not adv_data and not resp_data: + # If the user didn't manually specify adv_data / resp_data then + # construct them from the kwargs. Keep adding fields to adv_data, + # overflowing to resp_data if necessary. + # TODO: Try and do better bin-packing than just concatenating in + # order? + + adv_data = bytearray() + + resp_data = _append( + adv_data, + resp_data, + _ADV_TYPE_FLAGS, + struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), + ) + + # Services are prioritised to go in the advertising data because iOS supports + # filtering scan results by service only, so services must come first. + if services: + for uuid in services: + b = bytes(uuid) + if len(b) == 2: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID16_COMPLETE, b) + elif len(b) == 4: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID32_COMPLETE, b) + elif len(b) == 16: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_UUID128_COMPLETE, b) + + if name: + resp_data = _append(adv_data, resp_data, _ADV_TYPE_NAME, name) + + if appearance: + # See org.bluetooth.characteristic.gap.appearance.xml + resp_data = _append( + adv_data, resp_data, _ADV_TYPE_APPEARANCE, struct.pack("> 2) & 0b10) + sMotD = 1 - ((motD >> 2) & 0b10) + motG = ((motG & 0b0111) * sMotG) * 100 / 7 + motD = ((motD & 0b0111) * sMotD) * 100 / 7 + print("Mot G :", motG) + print("Mot D :", motD) + alphabot.setMotors(left=motG, right=motD) + +while True: + joystickButton = alphabot.getJoystickValue() + if (joystickButton == "center"): + oled.text("Coucou !", 0, 0) + oled.show() + utime.sleep(1) + oled.fill(0) + oled.show() + if (joystickButton == "left"): + alphabot.setMotors(left=-100, right=100) + utime.sleep(1) + alphabot.stop() + if (joystickButton == "right"): + StartCPU("./out.bin", motorCallback) + alphabot.stop() + + diff --git a/tmp/neopixel.py b/tmp/neopixel.py new file mode 100644 index 0000000..140fa66 --- /dev/null +++ b/tmp/neopixel.py @@ -0,0 +1,45 @@ +# NeoPixel driver for MicroPython +# MIT license; Copyright (c) 2016 Damien P. George, 2021 Jim Mussared + +from machine import bitstream + +class NeoPixel: + # G R B W + ORDER = (1, 0, 2, 3) + + def __init__(self, pin, n, bpp=3, timing=1): + self.pin = pin + self.n = n + self.bpp = bpp + self.buf = bytearray(n * bpp) + self.pin.init(pin.OUT) + # Timing arg can either be 1 for 800kHz or 0 for 400kHz, + # or a user-specified timing ns tuple (high_0, low_0, high_1, low_1). + self.timing = ( + ((400, 850, 800, 450) if timing else (800, 1700, 1600, 900)) + if isinstance(timing, int) + else timing + ) + + def __len__(self): + return self.n + + def __setitem__(self, i, v): + offset = i * self.bpp + for i in range(self.bpp): + self.buf[offset + self.ORDER[i]] = v[i] + + def __getitem__(self, i): + offset = i * self.bpp + return tuple(self.buf[offset + self.ORDER[i]] for i in range(self.bpp)) + + def fill(self, v): + b = self.buf + for i in range(self.bpp): + c = v[i] + for j in range(self.ORDER[i], len(self.buf), self.bpp): + b[j] = c + + def write(self): + # BITSTREAM_TYPE_HIGH_LOW = 0 + bitstream(self.pin, 0, self.timing, self.buf) diff --git a/tmp/out.bin b/tmp/out.bin new file mode 100644 index 0000000..c599987 --- /dev/null +++ b/tmp/out.bin @@ -0,0 +1 @@ +àðød€ \ No newline at end of file diff --git a/tmp/pybcdc.inf b/tmp/pybcdc.inf new file mode 100644 index 0000000..a131ec7 --- /dev/null +++ b/tmp/pybcdc.inf @@ -0,0 +1,92 @@ +; Windows USB CDC ACM Setup File +; Based on INF files which were: +; Copyright (c) 2000 Microsoft Corporation +; Copyright (C) 2007 Microchip Technology Inc. +; Likely to be covered by the MLPL as found at: +; . + +[Version] +Signature="$Windows NT$" +Class=Ports +ClassGuid={4D36E978-E325-11CE-BFC1-08002BE10318} +Provider=%MFGNAME% +LayoutFile=layout.inf +DriverVer=03/11/2010,5.1.2600.3 + +[Manufacturer] +%MFGNAME%=DeviceList, NTamd64 + +[DestinationDirs] +DefaultDestDir=12 + +;--------------------------------------------------------------------- +; Windows 2000/XP/Server2003/Vista/Server2008/7 - 32bit Sections + +[DriverInstall.nt] +include=mdmcpq.inf +CopyFiles=DriverCopyFiles.nt +AddReg=DriverInstall.nt.AddReg + +[DriverCopyFiles.nt] +usbser.sys,,,0x20 + +[DriverInstall.nt.AddReg] +HKR,,DevLoader,,*ntkern +HKR,,NTMPDriver,,usbser.sys +HKR,,EnumPropPages32,,"MsPorts.dll,SerialPortPropPageProvider" + +[DriverInstall.nt.Services] +AddService=usbser, 0x00000002, DriverService.nt + +[DriverService.nt] +DisplayName=%SERVICE% +ServiceType=1 +StartType=3 +ErrorControl=1 +ServiceBinary=%12%\usbser.sys + +;--------------------------------------------------------------------- +; Windows XP/Server2003/Vista/Server2008/7 - 64bit Sections + +[DriverInstall.NTamd64] +include=mdmcpq.inf +CopyFiles=DriverCopyFiles.NTamd64 +AddReg=DriverInstall.NTamd64.AddReg + +[DriverCopyFiles.NTamd64] +usbser.sys,,,0x20 + +[DriverInstall.NTamd64.AddReg] +HKR,,DevLoader,,*ntkern +HKR,,NTMPDriver,,usbser.sys +HKR,,EnumPropPages32,,"MsPorts.dll,SerialPortPropPageProvider" + +[DriverInstall.NTamd64.Services] +AddService=usbser, 0x00000002, DriverService.NTamd64 + +[DriverService.NTamd64] +DisplayName=%SERVICE% +ServiceType=1 +StartType=3 +ErrorControl=1 +ServiceBinary=%12%\usbser.sys + +;--------------------------------------------------------------------- +; Vendor and Product ID Definitions + +[SourceDisksFiles] +[SourceDisksNames] +[DeviceList] +%DESCRIPTION%=DriverInstall, USB\VID_f055&PID_9800&MI_00, USB\VID_f055&PID_9800&MI_01, USB\VID_f055&PID_9801&MI_00, USB\VID_f055&PID_9801&MI_01, USB\VID_f055&PID_9802 + +[DeviceList.NTamd64] +%DESCRIPTION%=DriverInstall, USB\VID_f055&PID_9800&MI_00, USB\VID_f055&PID_9800&MI_01, USB\VID_f055&PID_9801&MI_00, USB\VID_f055&PID_9801&MI_01, USB\VID_f055&PID_9802 + +;--------------------------------------------------------------------- +; String Definitions + +[Strings] +MFGFILENAME="pybcdc" +MFGNAME="MicroPython" +DESCRIPTION="Pyboard USB Comm Port" +SERVICE="USB Serial Driver" diff --git a/tmp/stm32_TRsensors.py b/tmp/stm32_TRsensors.py new file mode 100644 index 0000000..371d250 --- /dev/null +++ b/tmp/stm32_TRsensors.py @@ -0,0 +1,199 @@ +""" +QTRSensors.h - Originally Arduino Library for using Pololu QTR reflectance sensors and reflectance sensor arrays + +MIT Licence +Copyright (c) 2008-2012 waveshare Corporation. For more information, see +https://www.waveshare.com/wiki/AlphaBot2-Ar + +Copyright (c) 2021 leomlr (Léo Meillier). For more information, see +https://github.com/vittascience/stm32-libraries +https://vittascience.com/stm32/ + +You may freely modify and share this code, as long as you keep this notice intact. + +Disclaimer: To the extent permitted by law, waveshare provides this work +without any warranty. It might be defective, in which case you agree +to be responsible for all resulting costs and damages. + +Author: Léo Meillier (leomlr) +Date: 07/2021 +Note: library adapted in micropython for using 5 QTR sensors on Alphabot v2 robot controlled by STM32 board. +""" + +import pyb +from micropython import const +import utime + +PIN_CS = 'D10' +PIN_DOUT = 'D11' +PIN_ADDR = 'D12' +PIN_CLK = 'D13' + +NUMSENSORS = const(5) + +QTR_EMITTERS_OFF = const(0x00) +QTR_EMITTERS_ON = const(0x01) +QTR_EMITTERS_ON_AND_OFF = const(0x02) + +QTR_NO_EMITTER_PIN = const(0xff) + +QTR_MAX_SENSORS = const(16) + +class TRSensors(object): + + """ Base class data member initialization (called by derived class init()). """ + def __init__(self, cs=PIN_CS, dout=PIN_DOUT, addr=PIN_ADDR, clk=PIN_CLK): + + self._cs = pyb.Pin(cs, pyb.Pin.OUT) + self._dout = pyb.Pin(dout, pyb.Pin.IN) + self._addr = pyb.Pin(addr, pyb.Pin.OUT) + self._clk = pyb.Pin(clk, pyb.Pin.OUT) + + self._numSensors = NUMSENSORS + + self.calibratedMin = [0] * self._numSensors + self.calibratedMax = [1023] * self._numSensors + self.last_value = 0 + + """ Reads the sensor values using TLC1543 ADC chip into an array. + The values returned are a measure of the reflectance in abstract units, + with higher values corresponding to lower reflectance (e.g. a black + surface or a void). """ + + def analogRead(self): + value = [0]* (self._numSensors+1) + #Read Channel0~channel4 AD value + for j in range(0, self._numSensors+1): + self._cs.off() + for i in range(0,4): + #sent 4-bit Address + if (j >> (3 - i)) & 0x01: + self._addr.on() + else: + self._addr.off() + #read MSB 4-bit data + value[j] <<= 1 + if self._dout.value(): + value[j] |= 0x01 + self._clk.on() + self._clk.off() + for i in range(0, self._numSensors+1): + #read LSB 8-bit data + value[j] <<= 1 + if self._dout.value(): + value[j] |= 0x01 + self._clk.on() + self._clk.off() + #no mean ,just delay + #for i in range(0,6): + # self._clk.on() + # self._clk.off() + utime.sleep_us(100) + self._cs.on() + return value[1:] + + """ Reads the sensors 10 times and uses the results for + calibration. The sensor values are not returned instead, the + maximum and minimum values found over time are stored internally + and used for the readCalibrated() method. """ + + def calibrate(self): + sensor_values = [] + max_sensor_values = [0]*self._numSensors + min_sensor_values = [0]*self._numSensors + + for j in range(0, 10): + sensor_values = self.analogRead() + for i in range(0, self._numSensors): + # set the max we found THIS time + if j == 0 or max_sensor_values[i] < sensor_values[i]: + max_sensor_values[i] = sensor_values[i] + # set the min we found THIS time + if j == 0 or min_sensor_values[i] > sensor_values[i]: + min_sensor_values[i] = sensor_values[i] + + # record the min and max calibration values + for i in range(0, self._numSensors): + if min_sensor_values[i] > self.calibratedMax[i]: + self.calibratedMax[i] = min_sensor_values[i] + if max_sensor_values[i] < self.calibratedMin[i]: + self.calibratedMin[i] = max_sensor_values[i] + + """ Returns values calibrated to a value between 0 and 1000, where + 0 corresponds to the minimum value read by calibrate() and 1000 + corresponds to the maximum value. Calibration values are + stored separately for each sensor, so that differences in the + sensors are accounted for automatically. """ + + def readCalibrated(self): + + # read the needed values + sensor_values = self.analogRead() + + for i in range(self._numSensors): + denominator = self.calibratedMax[i] - self.calibratedMin[i] + value = 0 + if denominator is not 0: + value = (sensor_values[i] - self.calibratedMin[i]) * 1000 / denominator + if value < 0: + value = 0 + elif value > 1000: + value = 1000 + sensor_values[i] = value + + return sensor_values + + """ Operates the same as read calibrated, but also returns an + estimated position of the robot with respect to a line. The + estimate is made using a weighted average of the sensor indices + multiplied by 1000, so that a return value of 0 indicates that + the line is directly below sensor 0, a return value of 1000 + indicates that the line is directly below sensor 1, 2000 + indicates that it's below sensor 2000, etc. Intermediate + values indicate that the line is between two sensors. The + formula is: + + 0*value0 + 1000*value1 + 2000*value2 + ... + -------------------------------------------- + value0 + value1 + value2 + ... + + By default, this function assumes a dark line (high values) + surrounded by white (low values). If your line is light on + black, set the optional second argument white_line to true. In + this case, each sensor value will be replaced by (1000-value) + before the averaging. """ + + def readLine(self, white_line = 0): + + sensor_values = self.readCalibrated() + avg = 0 + sum = 0 + on_line = 0 + for i in range(0, self._numSensors): + value = sensor_values[i] + if white_line: + value = 1000-value + # keep track of whether we see the line at all + if value > 200: + on_line = 1 + + # only average in values that are above a noise threshold + if value > 50: + avg += value * (i * 1000) # this is for the weighted total, + sum += value # this is for the denominator + + if on_line != 1: + # If it last read to the left of center, return 0. + if self.last_value < (self._numSensors - 1)*1000/2: + #print("left") + self.last_value = 0 + + # If it last read to the right of center, return the max. + else: + #print("right") + self.last_value = (self._numSensors - 1)*1000 + + else: + self.last_value = avg/sum + + return self.last_value,sensor_values; diff --git a/tmp/stm32_alphabot_v2.py b/tmp/stm32_alphabot_v2.py new file mode 100644 index 0000000..9513dfa --- /dev/null +++ b/tmp/stm32_alphabot_v2.py @@ -0,0 +1,265 @@ +""" +MicroPython for AlphaBot2-Ar from Waveshare. +https://github.com/vittascience/stm32-libraries +https://www.waveshare.com/wiki/AlphaBot2-Ar + +MIT License +Copyright (c) 2021 leomlr (Léo Meillier) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +__version__ = "0.0.0-auto.0" +__repo__ = "show" + +from stm32_TRsensors import TRSensors +from stm32_pcf8574 import PCF8574 +import machine +import pyb +import utime + +ALPHABOT_V2_PIN_AIN2 = 'A0' +ALPHABOT_V2_PIN_AIN1 = 'A1' +ALPHABOT_V2_PIN_BIN1 = 'A2' +ALPHABOT_V2_PIN_BIN2 = 'A3' + +ALPHABOT_V2_PIN_ECHO = 'D2' +ALPHABOT_V2_PIN_TRIG = 'D3' +ALPHABOT_V2_PIN_IR = 'D4' +ALPHABOT_V2_PIN_PWMB = 'D5' +ALPHABOT_V2_PIN_PWMA = 'D6' +ALPHABOT_V2_PIN_RGB = 'D7' + +ALPHABOT_V2_PIN_OLED_D_C = 'D8' +ALPHABOT_V2_PIN_OLED_RESET = 'D9' + +ALPHABOT_V2_PIN_TRS_CS = 'D10' +ALPHABOT_V2_PIN_TRS_DOUT = 'D11' +ALPHABOT_V2_PIN_TRS_ADDR = 'D12' +ALPHABOT_V2_PIN_TRS_CLK = 'D13' + +ALPHABOT_V2_PCF8574_I2C_ADDR = 0x20 +ALPHABOT_V2_OLED_I2C_ADDR_DC_OFF = 0x3c +ALPHABOT_V2_OLED_I2C_ADDR_DC_ON = 0x3d + +class AlphaBot_v2(object): + + def __init__(self): + self.ain1 = pyb.Pin(ALPHABOT_V2_PIN_AIN1, pyb.Pin.OUT) + self.ain2 = pyb.Pin(ALPHABOT_V2_PIN_AIN2, pyb.Pin.OUT) + self.bin1 = pyb.Pin(ALPHABOT_V2_PIN_BIN1, pyb.Pin.OUT) + self.bin2 = pyb.Pin(ALPHABOT_V2_PIN_BIN2, pyb.Pin.OUT) + + self.pin_PWMA = pyb.Pin(ALPHABOT_V2_PIN_PWMA, pyb.Pin.OUT_PP) + tim_A = pyb.Timer(1, freq=500) + self.PWMA = tim_A.channel(1, pyb.Timer.PWM, pin=self.pin_PWMA) + + self.pin_PWMB = pyb.Pin(ALPHABOT_V2_PIN_PWMB, pyb.Pin.OUT_PP) + tim_B = pyb.Timer(2, freq=500) + self.PWMB = tim_B.channel(1, pyb.Timer.PWM, pin=self.pin_PWMB) + + self.stop() + + print('[Alpha_INFO]: Motors initialised') + + self.trig = pyb.Pin(ALPHABOT_V2_PIN_TRIG, pyb.Pin.OUT) + self.echo = pyb.Pin(ALPHABOT_V2_PIN_ECHO, pyb.Pin.IN) + + self.pin_RGB = pyb.Pin(ALPHABOT_V2_PIN_RGB, pyb.Pin.OUT) + + self.tr_sensors = TRSensors( + cs = ALPHABOT_V2_PIN_TRS_CS, + dout = ALPHABOT_V2_PIN_TRS_DOUT, + addr = ALPHABOT_V2_PIN_TRS_ADDR, + clk = ALPHABOT_V2_PIN_TRS_CLK + ) + + print('[Alpha_INFO]: TR sensors initialised') + + self.i2c = machine.I2C(1) + + self.LEFT_OBSTACLE = 'L' + self.RIGHT_OBSTACLE = 'R' + self.BOTH_OBSTACLE = 'B' + self.NO_OBSTACLE = 'N' + + self.JOYSTICK_UP = 'up' + self.JOYSTICK_RIGHT = 'right' + self.JOYSTICK_LEFT = 'left' + self.JOYSTICK_DOWN = 'down' + self.JOYSTICK_CENTER = 'center' + + print('[Alpha_INFO]: IR detectors initialised (for obstacles)') + + self.pin_IR = pyb.Pin(ALPHABOT_V2_PIN_IR, pyb.Pin.IN) + + print('[Alpha_INFO]: IR receiver initialised (for remotes)') + + self.pin_oled_reset = pyb.Pin(ALPHABOT_V2_PIN_OLED_RESET, pyb.Pin.OUT) + self.pin_oled_reset.off() + utime.sleep_ms(10) + self.pin_oled_reset.on() + self.pin_DC = pyb.Pin(ALPHABOT_V2_PIN_OLED_D_C, pyb.Pin.OUT) + + print('[Alpha_INFO]: OLED screen initialised') + + self._pcf8574 = PCF8574(self.i2c, addr=ALPHABOT_V2_PCF8574_I2C_ADDR) + + def setPWMA(self, value): + self.PWMA.pulse_width_percent(value) + + def setPWMB(self, value): + self.PWMB.pulse_width_percent(value) + + def setMotors(self, left=None, right=None): + if left is not None: + if left >= 0 and left <= 100: + self.ain1.off() + self.ain2.on() + self.setPWMA(left) + elif left >= -100 and left < 0: + self.ain1.on() + self.ain2.off() + self.setPWMA(-left) + if right is not None: + if right >= 0 and right <= 100: + self.bin1.off() + self.bin2.on() + self.setPWMB(right) + elif right >= -100 and right < 0: + self.bin1.on() + self.bin2.off() + self.setPWMB(-right) + + def stop(self): + self.setMotors(left=0, right=0) + + def moveForward(self, speed, duration_ms=0): + self.setMotors(left=speed, right=speed) + if duration_ms: + utime.sleep_ms(duration_ms) + self.stop() + + def moveBackward(self, speed, duration_ms=0): + self.setMotors(left=-speed, right=-speed) + if duration_ms: + utime.sleep_ms(duration_ms) + self.stop() + + def turnLeft(self, speed, duration_ms=0): + if speed < 20: + self.setMotors(left=speed, right=50-speed) + else: + self.setMotors(left=30-speed, right=speed) + if duration_ms: + utime.sleep_ms(duration_ms) + self.stop() + + def turnRight(self, speed, duration_ms=0): + if speed < 20: + self.setMotors(left=50-speed, right=speed) + else: + self.setMotors(left=speed, right=30-speed) + if duration_ms: + utime.sleep_ms(duration_ms) + self.stop() + + def calibrateLineFinder(self): + print("[Alpha_INFO]: TR sensors calibration ...\\n") + for i in range(0, 100): + if i<25 or i>= 75: + self.turnRight(15) + else: + self.turnLeft(15) + self.TRSensors_calibrate() + self.stop() + print("Calibration done.\\n") + print(str(self.tr_sensors.calibratedMin) + '\\n') + print(str(self.tr_sensors.calibratedMax) + '\\n') + utime.sleep_ms(500) + + def TRSensors_calibrate(self): + self.tr_sensors.calibrate() + + def TRSensors_read(self, sensor = 0): + return self.tr_sensors.analogRead() + + def TRSensors_readLine(self, sensor = 0): + position, sensor_values = self.tr_sensors.readLine() + if sensor is 0: + return sensor_values + else: + return sensor_values[sensor-1] + + def TRSensors_position_readLine(self, sensor = 0): + return self.tr_sensors.readLine() + + def readUltrasonicDistance(self, length=15, timeout_us = 30000): + measurements = 0 + for i in range(length): + self.trig.off() + utime.sleep_us(2) + self.trig.on() + utime.sleep_us(10) + self.trig.off() + self.echo.value() + measurements += machine.time_pulse_us(self.echo, 1, timeout_us)/1e6 # t_echo in seconds + duration = measurements/length + return 343 * duration/2 * 100 + + def getOLEDaddr(self): + if self.pin_DC.value(): + return ALPHABOT_V2_OLED_I2C_ADDR_DC_ON + else: + return ALPHABOT_V2_OLED_I2C_ADDR_DC_OFF + + # Drivers for PCF8574T + + def controlBuzzer(self, state): + self._pcf8574.pin(5, state) + + def getJoystickValue(self): + i = 0 + for i in range(5): + if not self._pcf8574.pin(i): break + elif i == 4: i = None + if i == 0: + return self.JOYSTICK_UP + elif i == 1: + return self.JOYSTICK_RIGHT + elif i == 2: + return self.JOYSTICK_LEFT + elif i == 3: + return self.JOYSTICK_DOWN + elif i == 4: + return self.JOYSTICK_CENTER + else: + return None + + def readInfrared(self): + left = not self._pcf8574.pin(7) + right = not self._pcf8574.pin(6) + if left and not right: + return self.LEFT_OBSTACLE + elif not left and right: + return self.RIGHT_OBSTACLE + elif left and right: + return self.BOTH_OBSTACLE + else: + return self.NO_OBSTACLE diff --git a/tmp/stm32_bleAdvertising.py b/tmp/stm32_bleAdvertising.py new file mode 100644 index 0000000..74de473 --- /dev/null +++ b/tmp/stm32_bleAdvertising.py @@ -0,0 +1,91 @@ +# Exemple pour générer des trames d'advertising pour le BLE + +from micropython import const +import struct +import bluetooth + +# Les trames d'advertising sont sont des paquets répétés ayant la structure suivante : +# 1 octet indiquant la taille des données (N + 1) +# 1 octet indiquant le type de données (voir les constantes ci-dessous) +# N octets de données du type indiqué + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_UUID32_COMPLETE = const(0x5) +_ADV_TYPE_UUID128_COMPLETE = const(0x7) +_ADV_TYPE_UUID16_MORE = const(0x2) +_ADV_TYPE_UUID32_MORE = const(0x4) +_ADV_TYPE_UUID128_MORE = const(0x6) +_ADV_TYPE_APPEARANCE = const(0x19) +_ADV_TYPE_MANUFACTURER = const(0xFF) + +# Génère une trame qui sera passée à la méthode gap_advertise(adv_data=...). + + +def adv_payload( + limited_disc=False, + br_edr=False, + name=None, + services=None, + appearance=0, + manufacturer=0, +): + payload = bytearray() + + def _append(adv_type, value): + nonlocal payload + payload += struct.pack("BB", len(value) + 1, adv_type) + value + + _append( + _ADV_TYPE_FLAGS, + struct.pack("B", (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)), + ) + + if name: + _append(_ADV_TYPE_NAME, name) + + if services: + for uuid in services: + b = bytes(uuid) + if len(b) == 2: + _append(_ADV_TYPE_UUID16_COMPLETE, b) + elif len(b) == 4: + _append(_ADV_TYPE_UUID32_COMPLETE, b) + elif len(b) == 16: + _append(_ADV_TYPE_UUID128_COMPLETE, b) + + if appearance: + # Voir org.bluetooth.characteristic.gap.appearance.xml + _append(_ADV_TYPE_APPEARANCE, struct.pack("= thresh: + self.callback(cmd, addr, ext, *self.args) + else: + self._errf(cmd) + + def error_function(self, func): + self._errf = func + + def close(self): + self._pin.irq(handler = None) + self.tim.deinit() diff --git a/tmp/stm32_nec.py b/tmp/stm32_nec.py new file mode 100644 index 0000000..a8d8b19 --- /dev/null +++ b/tmp/stm32_nec.py @@ -0,0 +1,62 @@ +# nec.py Decoder for IR remote control using synchronous code +# Supports NEC protocol. +# For a remote using NEC see https://www.adafruit.com/products/389 + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from utime import ticks_us, ticks_diff +from stm32_ir_receiver import IR_RX + +class NEC_ABC(IR_RX): + def __init__(self, pin, extended, callback, *args): + # Block lasts <= 80ms (extended mode) and has 68 edges + super().__init__(pin, 68, 80, callback, *args) + self._extended = extended + self._addr = 0 + + def decode(self, _): + try: + if self.edge > 68: + raise RuntimeError(self.OVERRUN) + width = ticks_diff(self._times[1], self._times[0]) + if width < 4000: # 9ms leading mark for all valid data + raise RuntimeError(self.BADSTART) + width = ticks_diff(self._times[2], self._times[1]) + if width > 3000: # 4.5ms space for normal data + if self.edge < 68: # Haven't received the correct number of edges + raise RuntimeError(self.BADBLOCK) + # Time spaces only (marks are always 562.5µs) + # Space is 1.6875ms (1) or 562.5µs (0) + # Skip last bit which is always 1 + val = 0 + for edge in range(3, 68 - 2, 2): + val >>= 1 + if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120: + val |= 0x80000000 + elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges. + raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP) # Treat REPEAT as error. + else: + raise RuntimeError(self.BADSTART) + addr = val & 0xff # 8 bit addr + cmd = (val >> 16) & 0xff + if cmd != (val >> 24) ^ 0xff: + raise RuntimeError(self.BADDATA) + if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check + if not self._extended: + raise RuntimeError(self.BADADDR) + addr |= val & 0xff00 # pass assumed 16 bit address to callback + self._addr = addr + except RuntimeError as e: + cmd = e.args[0] + addr = self._addr if cmd == self.REPEAT else 0 # REPEAT uses last address + # Set up for new data burst and run user callback + self.do_callback(cmd, addr, 0, self.REPEAT) + +class NEC_8(NEC_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, False, callback, *args) + +class NEC_16(NEC_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, True, callback, *args) diff --git a/tmp/stm32_pcf8574.py b/tmp/stm32_pcf8574.py new file mode 100644 index 0000000..065d6a8 --- /dev/null +++ b/tmp/stm32_pcf8574.py @@ -0,0 +1,49 @@ +class PCF8574: + def __init__(self, i2c, addr=0x20): + self._i2c = i2c + i2cModules = self._i2c.scan() + if addr not in i2cModules: + error = "Unable to find module 'PCF8574' at address " + str(hex(addr)) + ". Please check connections with the board.\n" + error += "[Info] I2C address.es detected: " + str([hex(a) for a in i2cModules]) + raise ValueError(error) + self._addr = addr + self._port = bytearray(1) + + @property + def port(self): + self._read() + return self._port[0] + + @port.setter + def port(self, value): + self._port[0] = value & 0xff + self._write() + + def pin(self, pin, value=None): + pin = self.validate_pin(pin) + if value is None: + self._read() + return (self._port[0] >> pin) & 1 + else: + if value: + self._port[0] |= (1 << (pin)) + else: + self._port[0] &= ~(1 << (pin)) + self._write() + + def toggle(self, pin): + pin = self.validate_pin(pin) + self._port[0] ^= (1 << (pin)) + self._write() + + def validate_pin(self, pin): + # pin valid range 0..7 + if not 0 <= pin <= 7: + raise ValueError('Invalid pin {}. Use 0-7.'.format(pin)) + return pin + + def _read(self): + self._i2c.readfrom_into(self._addr, self._port) + + def _write(self): + self._i2c.writeto(self._addr, self._port) diff --git a/tmp/stm32_ssd1306.py b/tmp/stm32_ssd1306.py new file mode 100644 index 0000000..96e78ac --- /dev/null +++ b/tmp/stm32_ssd1306.py @@ -0,0 +1,131 @@ +# MicroPython SSD1306 OLED I2C driver +from micropython import const +import framebuf +import utime + +SSD1306_I2C_ADDR = 0x3C + +# register definitions +SET_CONTRAST = const(0x81) +SET_ENTIRE_ON = const(0xA4) +SET_NORM_INV = const(0xA6) +SET_DISP = const(0xAE) +SET_MEM_ADDR = const(0x20) +SET_COL_ADDR = const(0x21) +SET_PAGE_ADDR = const(0x22) +SET_DISP_START_LINE = const(0x40) +SET_SEG_REMAP = const(0xA0) +SET_MUX_RATIO = const(0xA8) +SET_COM_OUT_DIR = const(0xC0) +SET_DISP_OFFSET = const(0xD3) +SET_COM_PIN_CFG = const(0xDA) +SET_DISP_CLK_DIV = const(0xD5) +SET_PRECHARGE = const(0xD9) +SET_VCOM_DESEL = const(0xDB) +SET_CHARGE_PUMP = const(0x8D) + +# Subclassing FrameBuffer provides support for graphics primitives +# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html +class SSD1306(framebuf.FrameBuffer): + def __init__(self, width, height, external_vcc): + self.width = width + self.height = height + self.external_vcc = external_vcc + self.pages = self.height // 8 + self.buffer = bytearray(self.pages * self.width) + super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) + self.init_display() + + def init_display(self): + for cmd in ( + SET_DISP, # display off + # address setting + SET_MEM_ADDR, + 0x00, # horizontal + # resolution and layout + SET_DISP_START_LINE, # start at line 0 + SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0 + SET_MUX_RATIO, + self.height - 1, + SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0 + SET_DISP_OFFSET, + 0x00, + SET_COM_PIN_CFG, + 0x02 if self.width > 2 * self.height else 0x12, + # timing and driving scheme + SET_DISP_CLK_DIV, + 0x80, + SET_PRECHARGE, + 0x22 if self.external_vcc else 0xF1, + SET_VCOM_DESEL, + 0x30, # 0.83*Vcc + # display + SET_CONTRAST, + 0xFF, # maximum + SET_ENTIRE_ON, # output follows RAM contents + SET_NORM_INV, # not inverted + # charge pump + SET_CHARGE_PUMP, + 0x10 if self.external_vcc else 0x14, + SET_DISP | 0x01, # display on + ): # on + self.write_cmd(cmd) + self.fill(0) + self.show() + + def poweroff(self): + self.write_cmd(SET_DISP) + + def poweron(self): + self.write_cmd(SET_DISP | 0x01) + + def contrast(self, contrast): + self.write_cmd(SET_CONTRAST) + self.write_cmd(contrast) + + def invert(self, invert): + self.write_cmd(SET_NORM_INV | (invert & 1)) + + def rotate(self, rotate): + self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3)) + self.write_cmd(SET_SEG_REMAP | (rotate & 1)) + + def show(self): + x0 = 0 + x1 = self.width - 1 + if self.width == 64: + # displays with width of 64 pixels are shifted by 32 + x0 += 32 + x1 += 32 + self.write_cmd(SET_COL_ADDR) + self.write_cmd(x0) + self.write_cmd(x1) + self.write_cmd(SET_PAGE_ADDR) + self.write_cmd(0) + self.write_cmd(self.pages - 1) + self.write_data(self.buffer) + +class SSD1306_I2C(SSD1306): + def __init__(self, width, height, i2c, addr=SSD1306_I2C_ADDR, external_vcc=False): + if i2c == None: + raise ValueError("I2C object 'SSD1306' needed as argument!") + self._i2c = i2c + utime.sleep_ms(200) + i2cModules = self._i2c.scan() + if addr not in i2cModules: + error = "Unable to find module 'SSD1306' at address " + str(hex(addr)) + ". Please check connections with the board.\n" + error += "[Info] I2C address.es detected: " + str([hex(a) for a in i2cModules]) + raise ValueError(error) + self._addr = addr + self.temp = bytearray(2) + self.write_list = [b"\x40", None] # Co=0, D/C#=1 + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.temp[0] = 0x80 # Co=1, D/C#=0 + self.temp[1] = cmd + self._i2c.writeto(self._addr, self.temp) + + def write_data(self, buf): + self.write_list[1] = buf + self._i2c.writevto(self._addr, self.write_list) diff --git a/tmp/stm32_vl53l0x.py b/tmp/stm32_vl53l0x.py new file mode 100644 index 0000000..7878955 --- /dev/null +++ b/tmp/stm32_vl53l0x.py @@ -0,0 +1,525 @@ +""" +MicroPython for Grove Time Of Flight VL53L0X sensor (I2C). +https://github.com/vittascience/stm32-libraries +https://wiki.seeedstudio.com/Grove-Time_of_Flight_Distance_Sensor-VL53L0X/ + +MIT License +Copyright (c) 2020 leomlr (Léo Meillier) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +__version__ = "0.0.0-auto.0" +__repo__ = "https://github.com/Vittascience/stm32-libraries" + +from micropython import const +import utime +import math + +_VL53L0X_IIC_ADDR = const(0x29) + +# Configuration constants: +_SYSRANGE_START = const(0x00) +_SYSTEM_THRESH_HIGH = const(0x0C) +_SYSTEM_THRESH_LOW = const(0x0E) +_SYSTEM_SEQUENCE_CONFIG = const(0x01) +_SYSTEM_RANGE_CONFIG = const(0x09) +_SYSTEM_INTERMEASUREMENT_PERIOD = const(0x04) +_SYSTEM_INTERRUPT_CONFIG_GPIO = const(0x0A) +_GPIO_HV_MUX_ACTIVE_HIGH = const(0x84) +_SYSTEM_INTERRUPT_CLEAR = const(0x0B) +_RESULT_INTERRUPT_STATUS = const(0x13) +_RESULT_RANGE_STATUS = const(0x14) +_RESULT_CORE_AMBIENT_WINDOW_EVENTS_RTN = const(0xBC) +_RESULT_CORE_RANGING_TOTAL_EVENTS_RTN = const(0xC0) +_RESULT_CORE_AMBIENT_WINDOW_EVENTS_REF = const(0xD0) +_RESULT_CORE_RANGING_TOTAL_EVENTS_REF = const(0xD4) +_RESULT_PEAK_SIGNAL_RATE_REF = const(0xB6) +_ALGO_PART_TO_PART_RANGE_OFFSET_MM = const(0x28) +_I2C_SLAVE_DEVICE_ADDRESS = const(0x8A) +_MSRC_CONFIG_CONTROL = const(0x60) +_PRE_RANGE_CONFIG_MIN_SNR = const(0x27) +_PRE_RANGE_CONFIG_VALID_PHASE_LOW = const(0x56) +_PRE_RANGE_CONFIG_VALID_PHASE_HIGH = const(0x57) +_PRE_RANGE_MIN_COUNT_RATE_RTN_LIMIT = const(0x64) +_FINAL_RANGE_CONFIG_MIN_SNR = const(0x67) +_FINAL_RANGE_CONFIG_VALID_PHASE_LOW = const(0x47) +_FINAL_RANGE_CONFIG_VALID_PHASE_HIGH = const(0x48) +_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT = const(0x44) +_PRE_RANGE_CONFIG_SIGMA_THRESH_HI = const(0x61) +_PRE_RANGE_CONFIG_SIGMA_THRESH_LO = const(0x62) +_PRE_RANGE_CONFIG_VCSEL_PERIOD = const(0x50) +_PRE_RANGE_CONFIG_TIMEOUT_MACROP_HI = const(0x51) +_PRE_RANGE_CONFIG_TIMEOUT_MACROP_LO = const(0x52) +_SYSTEM_HISTOGRAM_BIN = const(0x81) +_HISTOGRAM_CONFIG_INITIAL_PHASE_SELECT = const(0x33) +_HISTOGRAM_CONFIG_READOUT_CTRL = const(0x55) +_FINAL_RANGE_CONFIG_VCSEL_PERIOD = const(0x70) +_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI = const(0x71) +_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_LO = const(0x72) +_CROSSTALK_COMPENSATION_PEAK_RATE_MCPS = const(0x20) +_MSRC_CONFIG_TIMEOUT_MACROP = const(0x46) +_SOFT_RESET_GO2_SOFT_RESET_N = const(0xBF) +_IDENTIFICATION_MODEL_ID = const(0xC0) +_IDENTIFICATION_REVISION_ID = const(0xC2) +_OSC_CALIBRATE_VAL = const(0xF8) +_GLOBAL_CONFIG_VCSEL_WIDTH = const(0x32) +_GLOBAL_CONFIG_SPAD_ENABLES_REF_0 = const(0xB0) +_GLOBAL_CONFIG_SPAD_ENABLES_REF_1 = const(0xB1) +_GLOBAL_CONFIG_SPAD_ENABLES_REF_2 = const(0xB2) +_GLOBAL_CONFIG_SPAD_ENABLES_REF_3 = const(0xB3) +_GLOBAL_CONFIG_SPAD_ENABLES_REF_4 = const(0xB4) +_GLOBAL_CONFIG_SPAD_ENABLES_REF_5 = const(0xB5) +_GLOBAL_CONFIG_REF_EN_START_SELECT = const(0xB6) +_DYNAMIC_SPAD_NUM_REQUESTED_REF_SPAD = const(0x4E) +_DYNAMIC_SPAD_REF_EN_START_OFFSET = const(0x4F) +_POWER_MANAGEMENT_GO1_POWER_FORCE = const(0x80) +_VHV_CONFIG_PAD_SCL_SDA__EXTSUP_HV = const(0x89) +_ALGO_PHASECAL_LIM = const(0x30) +_ALGO_PHASECAL_CONFIG_TIMEOUT = const(0x30) +_VCSEL_PERIOD_PRE_RANGE = const(0) +_VCSEL_PERIOD_FINAL_RANGE = const(1) + +def _decode_timeout(val): + # format: "(LSByte * 2^MSByte) + 1" + return float(val & 0xFF) * math.pow(2.0, ((val & 0xFF00) >> 8)) + 1 + +def _encode_timeout(timeout_mclks): + # format: "(LSByte * 2^MSByte) + 1" + timeout_mclks = int(timeout_mclks) & 0xFFFF + ls_byte = 0 + ms_byte = 0 + if timeout_mclks > 0: + ls_byte = timeout_mclks - 1 + while ls_byte > 255: + ls_byte >>= 1 + ms_byte += 1 + return ((ms_byte << 8) | (ls_byte & 0xFF)) & 0xFFFF + return 0 + +def _timeout_mclks_to_microseconds(timeout_period_mclks, vcsel_period_pclks): + macro_period_ns = ((2304 * (vcsel_period_pclks) * 1655) + 500) // 1000 + return ((timeout_period_mclks * macro_period_ns) + (macro_period_ns // 2)) // 1000 + +def _timeout_microseconds_to_mclks(timeout_period_us, vcsel_period_pclks): + macro_period_ns = ((2304 * (vcsel_period_pclks) * 1655) + 500) // 1000 + return ((timeout_period_us * 1000) + (macro_period_ns // 2)) // macro_period_ns + +class VL53L0X: + """Driver for the VL53L0X distance sensor.""" + + def __init__(self, i2c, address=_VL53L0X_IIC_ADDR, io_timeout_s=0): + # pylint: disable=too-many-statements + self._i2c = i2c + self._addr = address + self.io_timeout_s = io_timeout_s + # Check identification registers for expected values. + # From section 3.2 of the datasheet. + if ( + self._read_u8(0xC0) is not 0xEE + or self._read_u8(0xC1) is not 0xAA + or self._read_u8(0xC2) is not 0x10 + ): + raise RuntimeError("Failed to find expected ID register values. Check wiring!") + # Initialize access to the sensor. This is based on the logic from: + # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp + # Set I2C standard mode. + for pair in ((0x88, 0x00), (0x80, 0x01), (0xFF, 0x01), (0x00, 0x00)): + self._write_u8(pair[0], pair[1]) + self._stop_variable = self._read_u8(0x91) + for pair in ((0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)): + self._write_u8(pair[0], pair[1]) + # disable SIGNAL_RATE_MSRC (bit 1) and SIGNAL_RATE_PRE_RANGE (bit 4) + # limit checks + config_control = self._read_u8(_MSRC_CONFIG_CONTROL) | 0x12 + self._write_u8(_MSRC_CONFIG_CONTROL, config_control) + # set final range signal rate limit to 0.25 MCPS (million counts per + # second) + self.signal_rate_limit = 0.25 + self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xFF) + spad_count, spad_is_aperture = self._get_spad_info() + # The SPAD map (RefGoodSpadMap) is read by + # VL53L0X_get_info_from_device() in the API, but the same data seems to + # be more easily readable from GLOBAL_CONFIG_SPAD_ENABLES_REF_0 through + # _6, so read it from there. + ref_spad_map = bytearray(1) + ref_spad_map[0] = _GLOBAL_CONFIG_SPAD_ENABLES_REF_0 + self._i2c.writeto(self._addr, ref_spad_map) + buf = bytearray(6) + self._i2c.readfrom_mem_into(self._addr, ref_spad_map[0], buf) + ref_spad_map.extend(buf) + + for pair in ( + (0xFF, 0x01), + (_DYNAMIC_SPAD_REF_EN_START_OFFSET, 0x00), + (_DYNAMIC_SPAD_NUM_REQUESTED_REF_SPAD, 0x2C), + (0xFF, 0x00), + (_GLOBAL_CONFIG_REF_EN_START_SELECT, 0xB4), + ): + self._write_u8(pair[0], pair[1]) + + first_spad_to_enable = 12 if spad_is_aperture else 0 + spads_enabled = 0 + for i in range(48): + if i < first_spad_to_enable or spads_enabled == spad_count: + # This bit is lower than the first one that should be enabled, + # or (reference_spad_count) bits have already been enabled, so + # zero this bit. + ref_spad_map[1 + (i // 8)] &= ~(1 << (i % 8)) + elif (ref_spad_map[1 + (i // 8)] >> (i % 8)) & 0x1 > 0: + spads_enabled += 1 + self._i2c.writeto(self._addr, ref_spad_map) + for pair in ( + (0xFF, 0x01), + (0x00, 0x00), + (0xFF, 0x00), + (0x09, 0x00), + (0x10, 0x00), + (0x11, 0x00), + (0x24, 0x01), + (0x25, 0xFF), + (0x75, 0x00), + (0xFF, 0x01), + (0x4E, 0x2C), + (0x48, 0x00), + (0x30, 0x20), + (0xFF, 0x00), + (0x30, 0x09), + (0x54, 0x00), + (0x31, 0x04), + (0x32, 0x03), + (0x40, 0x83), + (0x46, 0x25), + (0x60, 0x00), + (0x27, 0x00), + (0x50, 0x06), + (0x51, 0x00), + (0x52, 0x96), + (0x56, 0x08), + (0x57, 0x30), + (0x61, 0x00), + (0x62, 0x00), + (0x64, 0x00), + (0x65, 0x00), + (0x66, 0xA0), + (0xFF, 0x01), + (0x22, 0x32), + (0x47, 0x14), + (0x49, 0xFF), + (0x4A, 0x00), + (0xFF, 0x00), + (0x7A, 0x0A), + (0x7B, 0x00), + (0x78, 0x21), + (0xFF, 0x01), + (0x23, 0x34), + (0x42, 0x00), + (0x44, 0xFF), + (0x45, 0x26), + (0x46, 0x05), + (0x40, 0x40), + (0x0E, 0x06), + (0x20, 0x1A), + (0x43, 0x40), + (0xFF, 0x00), + (0x34, 0x03), + (0x35, 0x44), + (0xFF, 0x01), + (0x31, 0x04), + (0x4B, 0x09), + (0x4C, 0x05), + (0x4D, 0x04), + (0xFF, 0x00), + (0x44, 0x00), + (0x45, 0x20), + (0x47, 0x08), + (0x48, 0x28), + (0x67, 0x00), + (0x70, 0x04), + (0x71, 0x01), + (0x72, 0xFE), + (0x76, 0x00), + (0x77, 0x00), + (0xFF, 0x01), + (0x0D, 0x01), + (0xFF, 0x00), + (0x80, 0x01), + (0x01, 0xF8), + (0xFF, 0x01), + (0x8E, 0x01), + (0x00, 0x01), + (0xFF, 0x00), + (0x80, 0x00), + ): + self._write_u8(pair[0], pair[1]) + + self._write_u8(_SYSTEM_INTERRUPT_CONFIG_GPIO, 0x04) + gpio_hv_mux_active_high = self._read_u8(_GPIO_HV_MUX_ACTIVE_HIGH) + self._write_u8( + _GPIO_HV_MUX_ACTIVE_HIGH, gpio_hv_mux_active_high & ~0x10 + ) # active low + self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01) + self._measurement_timing_budget_us = self.measurement_timing_budget + self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xE8) + self.measurement_timing_budget = self._measurement_timing_budget_us + self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0x01) + self._perform_single_ref_calibration(0x40) + self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0x02) + self._perform_single_ref_calibration(0x00) + # "restore the previous Sequence Config" + self._write_u8(_SYSTEM_SEQUENCE_CONFIG, 0xE8) + + def _read_u8(self, address): + # Read an 8-bit unsigned value from the specified 8-bit address. + buf = self._i2c.readfrom_mem(self._addr, address, 1) + return buf[0] + + def _read_u16(self, address): + # Read a 16-bit BE unsigned value from the specified 8-bit address. + buf = self._i2c.readfrom_mem(self._addr, address, 2) + return (buf[0] << 8) | buf[1] + + def _write_u8(self, address, val): + # Write an 8-bit unsigned value to the specified 8-bit address. + self._i2c.writeto(self._addr, bytearray([address & 0xFF, val & 0xFF])) + + def _write_u16(self, address, val): + # Write a 16-bit BE unsigned value to the specified 8-bit address. + self._i2c.writeto(self._addr, bytearray([address & 0xFF, (val >> 8) & 0xFF, val & 0xFF])) + + def _get_spad_info(self): + # Get reference SPAD count and type, returned as a 2-tuple of + # count and boolean is_aperture. Based on code from: + # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp + for pair in ((0x80, 0x01), (0xFF, 0x01), (0x00, 0x00), (0xFF, 0x06)): + self._write_u8(pair[0], pair[1]) + self._write_u8(0x83, self._read_u8(0x83) | 0x04) + for pair in ( + (0xFF, 0x07), + (0x81, 0x01), + (0x80, 0x01), + (0x94, 0x6B), + (0x83, 0x00), + ): + self._write_u8(pair[0], pair[1]) + start = utime.gmtime() + while self._read_u8(0x83) == 0x00: + if ( + self.io_timeout_s > 0 + and (utime.gmtime() - start) >= self.io_timeout_s + ): + raise RuntimeError("Timeout waiting for VL53L0X!") + self._write_u8(0x83, 0x01) + tmp = self._read_u8(0x92) + count = tmp & 0x7F + is_aperture = ((tmp >> 7) & 0x01) == 1 + for pair in ((0x81, 0x00), (0xFF, 0x06)): + self._write_u8(pair[0], pair[1]) + self._write_u8(0x83, self._read_u8(0x83) & ~0x04) + for pair in ((0xFF, 0x01), (0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)): + self._write_u8(pair[0], pair[1]) + return (count, is_aperture) + + def _perform_single_ref_calibration(self, vhv_init_byte): + # based on VL53L0X_perform_single_ref_calibration() from ST API. + self._write_u8(_SYSRANGE_START, 0x01 | vhv_init_byte & 0xFF) + start = utime.gmtime() + while (self._read_u8(_RESULT_INTERRUPT_STATUS) & 0x07) == 0: + if ( + self.io_timeout_s > 0 + and (utime.gmtime() - start) >= self.io_timeout_s + ): + raise RuntimeError("Timeout waiting for VL53L0X!") + self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01) + self._write_u8(_SYSRANGE_START, 0x00) + + def _get_vcsel_pulse_period(self, vcsel_period_type): + # pylint: disable=no-else-return + # Disable should be removed when refactor can be tested + if vcsel_period_type == _VCSEL_PERIOD_PRE_RANGE: + val = self._read_u8(_PRE_RANGE_CONFIG_VCSEL_PERIOD) + return (((val) + 1) & 0xFF) << 1 + elif vcsel_period_type == _VCSEL_PERIOD_FINAL_RANGE: + val = self._read_u8(_FINAL_RANGE_CONFIG_VCSEL_PERIOD) + return (((val) + 1) & 0xFF) << 1 + return 255 + + def _get_sequence_step_enables(self): + # based on VL53L0X_GetSequenceStepEnables() from ST API + sequence_config = self._read_u8(_SYSTEM_SEQUENCE_CONFIG) + tcc = (sequence_config >> 4) & 0x1 > 0 + dss = (sequence_config >> 3) & 0x1 > 0 + msrc = (sequence_config >> 2) & 0x1 > 0 + pre_range = (sequence_config >> 6) & 0x1 > 0 + final_range = (sequence_config >> 7) & 0x1 > 0 + return (tcc, dss, msrc, pre_range, final_range) + + def _get_sequence_step_timeouts(self, pre_range): + # based on get_sequence_step_timeout() from ST API but modified by + # pololu here: + # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp + pre_range_vcsel_period_pclks = self._get_vcsel_pulse_period( + _VCSEL_PERIOD_PRE_RANGE + ) + msrc_dss_tcc_mclks = (self._read_u8(_MSRC_CONFIG_TIMEOUT_MACROP) + 1) & 0xFF + msrc_dss_tcc_us = _timeout_mclks_to_microseconds( + msrc_dss_tcc_mclks, pre_range_vcsel_period_pclks + ) + pre_range_mclks = _decode_timeout( + self._read_u16(_PRE_RANGE_CONFIG_TIMEOUT_MACROP_HI) + ) + pre_range_us = _timeout_mclks_to_microseconds( + pre_range_mclks, pre_range_vcsel_period_pclks + ) + final_range_vcsel_period_pclks = self._get_vcsel_pulse_period( + _VCSEL_PERIOD_FINAL_RANGE + ) + final_range_mclks = _decode_timeout( + self._read_u16(_FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI) + ) + if pre_range: + final_range_mclks -= pre_range_mclks + final_range_us = _timeout_mclks_to_microseconds( + final_range_mclks, final_range_vcsel_period_pclks + ) + return ( + msrc_dss_tcc_us, + pre_range_us, + final_range_us, + final_range_vcsel_period_pclks, + pre_range_mclks, + ) + + @property + def signal_rate_limit(self): + """The signal rate limit in mega counts per second.""" + val = self._read_u16(_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT) + # Return value converted from 16-bit 9.7 fixed point to float. + return val / (1 << 7) + + @signal_rate_limit.setter + def signal_rate_limit(self, val): + assert 0.0 <= val <= 511.99 + # Convert to 16-bit 9.7 fixed point value from a float. + val = int(val * (1 << 7)) + self._write_u16(_FINAL_RANGE_CONFIG_MIN_COUNT_RATE_RTN_LIMIT, val) + + @property + def measurement_timing_budget(self): + """The measurement timing budget in microseconds.""" + budget_us = 1910 + 960 # Start overhead + end overhead. + tcc, dss, msrc, pre_range, final_range = self._get_sequence_step_enables() + step_timeouts = self._get_sequence_step_timeouts(pre_range) + msrc_dss_tcc_us, pre_range_us, final_range_us, _, _ = step_timeouts + if tcc: + budget_us += msrc_dss_tcc_us + 590 + if dss: + budget_us += 2 * (msrc_dss_tcc_us + 690) + elif msrc: + budget_us += msrc_dss_tcc_us + 660 + if pre_range: + budget_us += pre_range_us + 660 + if final_range: + budget_us += final_range_us + 550 + self._measurement_timing_budget_us = budget_us + return budget_us + + @measurement_timing_budget.setter + def measurement_timing_budget(self, budget_us): + # pylint: disable=too-many-locals + assert budget_us >= 20000 + used_budget_us = 1320 + 960 # Start (diff from get) + end overhead + tcc, dss, msrc, pre_range, final_range = self._get_sequence_step_enables() + step_timeouts = self._get_sequence_step_timeouts(pre_range) + msrc_dss_tcc_us, pre_range_us, _ = step_timeouts[:3] + final_range_vcsel_period_pclks, pre_range_mclks = step_timeouts[3:] + if tcc: + used_budget_us += msrc_dss_tcc_us + 590 + if dss: + used_budget_us += 2 * (msrc_dss_tcc_us + 690) + elif msrc: + used_budget_us += msrc_dss_tcc_us + 660 + if pre_range: + used_budget_us += pre_range_us + 660 + if final_range: + used_budget_us += 550 + # "Note that the final range timeout is determined by the timing + # budget and the sum of all other timeouts within the sequence. + # If there is no room for the final range timeout, then an error + # will be set. Otherwise the remaining time will be applied to + # the final range." + if used_budget_us > budget_us: + raise ValueError("Requested timeout too big.") + final_range_timeout_us = budget_us - used_budget_us + final_range_timeout_mclks = _timeout_microseconds_to_mclks( + final_range_timeout_us, final_range_vcsel_period_pclks + ) + if pre_range: + final_range_timeout_mclks += pre_range_mclks + self._write_u16( + _FINAL_RANGE_CONFIG_TIMEOUT_MACROP_HI, + _encode_timeout(final_range_timeout_mclks), + ) + self._measurement_timing_budget_us = budget_us + + def getRangeMillimeters(self): + """Perform a single reading of the range for an object in front of + the sensor and return the distance in millimeters. + """ + # Adapted from readRangeSingleMillimeters & + # readRangeContinuousMillimeters in pololu code at: + # https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp + for pair in ( + (0x80, 0x01), + (0xFF, 0x01), + (0x00, 0x00), + (0x91, self._stop_variable), + (0x00, 0x01), + (0xFF, 0x00), + (0x80, 0x00), + (_SYSRANGE_START, 0x01), + ): + self._write_u8(pair[0], pair[1]) + start = utime.gmtime() + while (self._read_u8(_SYSRANGE_START) & 0x01) > 0: + if (self.io_timeout_s > 0 and (utime.gmtime() - start) >= self.io_timeout_s): + raise RuntimeError("Timeout waiting for VL53L0X!") + start = utime.gmtime() + while (self._read_u8(_RESULT_INTERRUPT_STATUS) & 0x07) == 0: + if (self.io_timeout_s > 0 and (utime.gmtime() - start) >= self.io_timeout_s): + raise RuntimeError("Timeout waiting for VL53L0X!") + # assumptions: Linearity Corrective Gain is 1000 (default) + # fractional ranging is not enabled + range_mm = self._read_u16(_RESULT_RANGE_STATUS + 10) + self._write_u8(_SYSTEM_INTERRUPT_CLEAR, 0x01) + return range_mm + + def set_address(self, new_address): + """Set a new I2C address to the instantaited object. This is only called when using + multiple VL53L0X sensors on the same I2C bus (SDA & SCL pins). See also the + `example `_ for proper usage. + :param int new_address: The 7-bit `int` that is to be assigned to the VL53L0X sensor. + The address that is assigned should NOT be already in use by another device on the + I2C bus. + .. important:: To properly set the address to an individual VL53L0X sensor, you must + first ensure that all other VL53L0X sensors (using the default address of ``0x29``) + on the same I2C bus are in their off state by pulling the "SHDN" pins LOW. When the + "SHDN" pin is pulled HIGH again the default I2C address is ``0x29``. + """ + self._i2c.write(_I2C_SLAVE_DEVICE_ADDRESS, new_address & 0x7F)