Refactor into multiple files
This commit is contained in:
parent
1132335ea8
commit
6aa2d756c6
45
pico/src/log.py
Normal file
45
pico/src/log.py
Normal file
@ -0,0 +1,45 @@
|
||||
def _color(n: int):
|
||||
return f"\033[{n}m"
|
||||
|
||||
class Color:
|
||||
RED: str = _color(31)
|
||||
GREEN: str = _color(32)
|
||||
YELLOW: str = _color(33)
|
||||
BLUE: str = _color(34)
|
||||
MAGENTA: str = _color(35)
|
||||
CYAN: str = _color(36)
|
||||
RESET: str = _color(0)
|
||||
|
||||
|
||||
class Logger:
|
||||
tag: str
|
||||
color: str | None
|
||||
|
||||
def __init__(self, tag: str, color: str | None = None):
|
||||
self.tag = tag
|
||||
self.color = color
|
||||
|
||||
def info(self, message: str):
|
||||
self._impl("*", None, message)
|
||||
|
||||
def success(self, message: str):
|
||||
self._impl("+", Color.GREEN, message)
|
||||
|
||||
def warning(self, message: str):
|
||||
self._impl("w", Color.YELLOW, message)
|
||||
|
||||
def error(self, message: str):
|
||||
self._impl("!", Color.RED, message)
|
||||
|
||||
def _impl(self, symbol: str, color: str | None, message: str):
|
||||
prefix_len = len(symbol) + len(self.tag) + 3 # "[t|s]"
|
||||
|
||||
tag = f"{self.color}{self.tag}{Color.RESET}" if self.color else self.tag
|
||||
symbol = f"{color}{symbol}{Color.RESET}" if color else symbol
|
||||
prefix = f"[{tag}|{symbol}]"
|
||||
padding = prefix_len * " "
|
||||
|
||||
first = True
|
||||
for line in message.splitlines():
|
||||
print(prefix if first else padding, line)
|
||||
first = False
|
@ -1,78 +1,22 @@
|
||||
from machine import Pin
|
||||
import machine
|
||||
import network
|
||||
import time
|
||||
import asyncio
|
||||
|
||||
class StatusLED:
|
||||
led: Pin
|
||||
_status: int
|
||||
_period: float
|
||||
_duty_cycle: float
|
||||
|
||||
def __init__(self):
|
||||
self.led = Pin("LED", Pin.OUT)
|
||||
self._status = 1
|
||||
self._period = 1
|
||||
self._duty_cycle = 0.5
|
||||
|
||||
def status(self, status: int):
|
||||
self._status = status
|
||||
|
||||
|
||||
async def task(self):
|
||||
while True:
|
||||
t_1 = self._duty_cycle * self._period
|
||||
t_2 = (1 - self._duty_cycle) * self._period
|
||||
|
||||
t_blink = t_1 / self._status / 2
|
||||
for _ in range(self._status):
|
||||
self.led.value(1)
|
||||
await asyncio.sleep(t_blink)
|
||||
self.led.value(0)
|
||||
await asyncio.sleep(t_blink)
|
||||
|
||||
await asyncio.sleep(t_2)
|
||||
from statusled import StatusLED
|
||||
from log import Logger, Color
|
||||
from sensors.ultrasonic import UltraSonicSensor
|
||||
|
||||
STATUS_LED = StatusLED()
|
||||
|
||||
|
||||
class UltraSonicSensor:
|
||||
"""
|
||||
See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/
|
||||
"""
|
||||
|
||||
tx: Pin
|
||||
rx: Pin
|
||||
|
||||
def __init__(self, tx: int, rx: int):
|
||||
self.tx = Pin(tx, Pin.OUT)
|
||||
self.rx = Pin(rx, Pin.IN)
|
||||
|
||||
def query_mm(self):
|
||||
self.tx.value(0)
|
||||
time.sleep_us(5)
|
||||
self.tx.value(1)
|
||||
time.sleep_us(10)
|
||||
self.tx.value(0)
|
||||
|
||||
pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30)
|
||||
if pulse_time < 0:
|
||||
return None
|
||||
|
||||
return pulse_time * 100 // 582
|
||||
|
||||
|
||||
|
||||
async def ensure_network():
|
||||
from secret import SSID, PASSWORD
|
||||
log = Logger("net", Color.BLUE)
|
||||
|
||||
nic = network.WLAN(network.STA_IF)
|
||||
print("[net] setting up WLAN interface...")
|
||||
log.info("setting up WLAN interface...")
|
||||
nic.active(True)
|
||||
while not nic.active():
|
||||
await asyncio.sleep(0.1)
|
||||
print("[net] connecting...")
|
||||
log.info("connecting...")
|
||||
nic.connect(SSID, PASSWORD)
|
||||
|
||||
logged = False
|
||||
@ -90,29 +34,41 @@ async def ensure_network():
|
||||
STATUS_LED.status(2)
|
||||
if not logged:
|
||||
logged = True
|
||||
print(f"[net] Got IP: {nic.ifconfig()!r}")
|
||||
log.success(f"Got IP: {nic.ifconfig()!r}")
|
||||
rssi = nic.status("rssi") # pyright: ignore[reportAny]
|
||||
print(f"[net] {rssi=}")
|
||||
log.info(f"{rssi=}")
|
||||
else:
|
||||
logged = False
|
||||
STATUS_LED.status(1)
|
||||
print("[net] status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny]
|
||||
log.warning("status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny]
|
||||
if status == network.STAT_CONNECT_FAIL or status == network.STAT_NO_AP_FOUND:
|
||||
print("[net] retrying in 3s...")
|
||||
log.info("retrying in 3s...")
|
||||
await asyncio.sleep(3)
|
||||
nic.connect(SSID, PASSWORD)
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def _main():
|
||||
asyncio.create_task(STATUS_LED.task())
|
||||
asyncio.create_task(ensure_network())
|
||||
async def read_sensor():
|
||||
log = Logger("tank", Color.CYAN)
|
||||
sensor = UltraSonicSensor(22, 21)
|
||||
while True:
|
||||
d = sensor.query_mm()
|
||||
print(f"d: {d} mm")
|
||||
if d is None:
|
||||
log.warning("distance unreadable!")
|
||||
else:
|
||||
log.info(f"distance: {d}mm")
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
|
||||
async def _main():
|
||||
await asyncio.gather(
|
||||
STATUS_LED.task(),
|
||||
ensure_network(),
|
||||
read_sensor(),
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
asyncio.run(_main())
|
||||
|
||||
|
28
pico/src/sensors/ultrasonic.py
Normal file
28
pico/src/sensors/ultrasonic.py
Normal file
@ -0,0 +1,28 @@
|
||||
import machine
|
||||
import time
|
||||
|
||||
class UltraSonicSensor:
|
||||
"""
|
||||
See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/
|
||||
"""
|
||||
|
||||
tx: machine.Pin
|
||||
rx: machine.Pin
|
||||
|
||||
def __init__(self, tx: int, rx: int):
|
||||
self.tx = machine.Pin(tx, machine.Pin.OUT)
|
||||
self.rx = machine.Pin(rx, machine.Pin.IN)
|
||||
|
||||
def query_mm(self):
|
||||
self.tx.value(0)
|
||||
time.sleep_us(5)
|
||||
self.tx.value(1)
|
||||
time.sleep_us(10)
|
||||
self.tx.value(0)
|
||||
|
||||
pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30)
|
||||
if pulse_time < 0:
|
||||
return None
|
||||
|
||||
return pulse_time * 100 // 582
|
||||
|
32
pico/src/statusled.py
Normal file
32
pico/src/statusled.py
Normal file
@ -0,0 +1,32 @@
|
||||
from machine import Pin
|
||||
import asyncio
|
||||
|
||||
class StatusLED:
|
||||
led: Pin
|
||||
_status: int
|
||||
_period: float
|
||||
_duty_cycle: float
|
||||
|
||||
def __init__(self):
|
||||
self.led = Pin("LED", Pin.OUT)
|
||||
self._status = 1
|
||||
self._period = 1
|
||||
self._duty_cycle = 0.5
|
||||
|
||||
def status(self, status: int):
|
||||
self._status = status
|
||||
|
||||
|
||||
async def task(self):
|
||||
while True:
|
||||
t_1 = self._duty_cycle * self._period
|
||||
t_2 = (1 - self._duty_cycle) * self._period
|
||||
|
||||
t_blink = t_1 / self._status / 2
|
||||
for _ in range(self._status):
|
||||
self.led.value(1)
|
||||
await asyncio.sleep(t_blink)
|
||||
self.led.value(0)
|
||||
await asyncio.sleep(t_blink)
|
||||
|
||||
await asyncio.sleep(t_2)
|
Loading…
Reference in New Issue
Block a user