From 6aa2d756c66ff850bf09ebcf645d11ae6dd8c4c1 Mon Sep 17 00:00:00 2001 From: Kai Vogelgesang Date: Mon, 7 Jul 2025 15:50:41 +0200 Subject: [PATCH] Refactor into multiple files --- pico/src/log.py | 45 ++++++++++++++++ pico/src/main.py | 96 +++++++++------------------------- pico/src/sensors/ultrasonic.py | 28 ++++++++++ pico/src/statusled.py | 32 ++++++++++++ 4 files changed, 131 insertions(+), 70 deletions(-) create mode 100644 pico/src/log.py create mode 100644 pico/src/sensors/ultrasonic.py create mode 100644 pico/src/statusled.py diff --git a/pico/src/log.py b/pico/src/log.py new file mode 100644 index 0000000..de2002e --- /dev/null +++ b/pico/src/log.py @@ -0,0 +1,45 @@ +def _color(n: int): + return f"\033[{n}m" + +class Color: + RED: str = _color(31) + GREEN: str = _color(32) + YELLOW: str = _color(33) + BLUE: str = _color(34) + MAGENTA: str = _color(35) + CYAN: str = _color(36) + RESET: str = _color(0) + + +class Logger: + tag: str + color: str | None + + def __init__(self, tag: str, color: str | None = None): + self.tag = tag + self.color = color + + def info(self, message: str): + self._impl("*", None, message) + + def success(self, message: str): + self._impl("+", Color.GREEN, message) + + def warning(self, message: str): + self._impl("w", Color.YELLOW, message) + + def error(self, message: str): + self._impl("!", Color.RED, message) + + def _impl(self, symbol: str, color: str | None, message: str): + prefix_len = len(symbol) + len(self.tag) + 3 # "[t|s]" + + tag = f"{self.color}{self.tag}{Color.RESET}" if self.color else self.tag + symbol = f"{color}{symbol}{Color.RESET}" if color else symbol + prefix = f"[{tag}|{symbol}]" + padding = prefix_len * " " + + first = True + for line in message.splitlines(): + print(prefix if first else padding, line) + first = False \ No newline at end of file diff --git a/pico/src/main.py b/pico/src/main.py index 2dc39f3..f62539d 100644 --- a/pico/src/main.py +++ b/pico/src/main.py @@ -1,78 +1,22 @@ -from machine import Pin -import machine import network -import time import asyncio -class StatusLED: - led: Pin - _status: int - _period: float - _duty_cycle: float - - def __init__(self): - self.led = Pin("LED", Pin.OUT) - self._status = 1 - self._period = 1 - self._duty_cycle = 0.5 - - def status(self, status: int): - self._status = status - - - async def task(self): - while True: - t_1 = self._duty_cycle * self._period - t_2 = (1 - self._duty_cycle) * self._period - - t_blink = t_1 / self._status / 2 - for _ in range(self._status): - self.led.value(1) - await asyncio.sleep(t_blink) - self.led.value(0) - await asyncio.sleep(t_blink) - - await asyncio.sleep(t_2) +from statusled import StatusLED +from log import Logger, Color +from sensors.ultrasonic import UltraSonicSensor STATUS_LED = StatusLED() - -class UltraSonicSensor: - """ - See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/ - """ - - tx: Pin - rx: Pin - - def __init__(self, tx: int, rx: int): - self.tx = Pin(tx, Pin.OUT) - self.rx = Pin(rx, Pin.IN) - - def query_mm(self): - self.tx.value(0) - time.sleep_us(5) - self.tx.value(1) - time.sleep_us(10) - self.tx.value(0) - - pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30) - if pulse_time < 0: - return None - - return pulse_time * 100 // 582 - - - async def ensure_network(): from secret import SSID, PASSWORD + log = Logger("net", Color.BLUE) nic = network.WLAN(network.STA_IF) - print("[net] setting up WLAN interface...") + log.info("setting up WLAN interface...") nic.active(True) while not nic.active(): await asyncio.sleep(0.1) - print("[net] connecting...") + log.info("connecting...") nic.connect(SSID, PASSWORD) logged = False @@ -90,29 +34,41 @@ async def ensure_network(): STATUS_LED.status(2) if not logged: logged = True - print(f"[net] Got IP: {nic.ifconfig()!r}") + log.success(f"Got IP: {nic.ifconfig()!r}") rssi = nic.status("rssi") # pyright: ignore[reportAny] - print(f"[net] {rssi=}") + log.info(f"{rssi=}") else: + logged = False STATUS_LED.status(1) - print("[net] status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny] + log.warning("status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny] if status == network.STAT_CONNECT_FAIL or status == network.STAT_NO_AP_FOUND: - print("[net] retrying in 3s...") + log.info("retrying in 3s...") await asyncio.sleep(3) nic.connect(SSID, PASSWORD) await asyncio.sleep(1) -async def _main(): - asyncio.create_task(STATUS_LED.task()) - asyncio.create_task(ensure_network()) +async def read_sensor(): + log = Logger("tank", Color.CYAN) sensor = UltraSonicSensor(22, 21) while True: d = sensor.query_mm() - print(f"d: {d} mm") + if d is None: + log.warning("distance unreadable!") + else: + log.info(f"distance: {d}mm") await asyncio.sleep(0.5) + +async def _main(): + await asyncio.gather( + STATUS_LED.task(), + ensure_network(), + read_sensor(), + ) + + def main(): asyncio.run(_main()) diff --git a/pico/src/sensors/ultrasonic.py b/pico/src/sensors/ultrasonic.py new file mode 100644 index 0000000..0155c46 --- /dev/null +++ b/pico/src/sensors/ultrasonic.py @@ -0,0 +1,28 @@ +import machine +import time + +class UltraSonicSensor: + """ + See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/ + """ + + tx: machine.Pin + rx: machine.Pin + + def __init__(self, tx: int, rx: int): + self.tx = machine.Pin(tx, machine.Pin.OUT) + self.rx = machine.Pin(rx, machine.Pin.IN) + + def query_mm(self): + self.tx.value(0) + time.sleep_us(5) + self.tx.value(1) + time.sleep_us(10) + self.tx.value(0) + + pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30) + if pulse_time < 0: + return None + + return pulse_time * 100 // 582 + \ No newline at end of file diff --git a/pico/src/statusled.py b/pico/src/statusled.py new file mode 100644 index 0000000..a8f18b9 --- /dev/null +++ b/pico/src/statusled.py @@ -0,0 +1,32 @@ +from machine import Pin +import asyncio + +class StatusLED: + led: Pin + _status: int + _period: float + _duty_cycle: float + + def __init__(self): + self.led = Pin("LED", Pin.OUT) + self._status = 1 + self._period = 1 + self._duty_cycle = 0.5 + + def status(self, status: int): + self._status = status + + + async def task(self): + while True: + t_1 = self._duty_cycle * self._period + t_2 = (1 - self._duty_cycle) * self._period + + t_blink = t_1 / self._status / 2 + for _ in range(self._status): + self.led.value(1) + await asyncio.sleep(t_blink) + self.led.value(0) + await asyncio.sleep(t_blink) + + await asyncio.sleep(t_2)