Compare commits

...

4 Commits

10 changed files with 1767 additions and 111 deletions

1
pico/.gitignore vendored
View File

@ -1,3 +1,4 @@
build
.venv
typings
src/secret.py

View File

@ -1,6 +1,27 @@
SRC_DIR := src
BUILD_DIR := build
SRC_FILES := $(wildcard $(SRC_DIR)/*.py) $(wildcard $(SRC_DIR)/**/*.py)
BUILD_FILES := $(patsubst $(SRC_DIR)/%.py,$(BUILD_DIR)/%.mpy, $(SRC_FILES))
$(BUILD_DIR)/%.mpy: $(SRC_DIR)/%.py
@mkdir -p `dirname $@`
mpy-cross $< -o $@
.PHONY: build
build: $(BUILD_FILES)
.PHONY: clean
clean:
rm -rf $(BUILD_DIR)
.PHONY: deploy
deploy:
mpremote fs cp src/* :
deploy: $(BUILD_FILES)
mpremote fs cp -r $(BUILD_DIR)/* :
.PHONY: purge
purge:
mpremote run purge.py
.PHONY: run
run: deploy

View File

@ -1,6 +1,20 @@
# Setup:
```
$ python -m venv .venv
$ source .venv/bin/activate
$ pip install -U micropython-rp2-pico_w-stubs --no-user --target typings
```
1. Set up the virtual environment:
```
$ python -m venv .venv
$ source .venv/bin/activate
```
2. Install the type stubs:
```
$ pip install -U micropython-rp2-pico_w-stubs --no-user --target typings
```
3. Ensure `mpy-cross` is available:
```
$ git clone https://github.com/micropython/micropython.git /tmp/micropython
$ pushd /tmp/micropython/mpy-cross
$ make
$ mv build/mpy-cross ~/.local/bin/
$ popd
```

14
pico/purge.py Normal file
View File

@ -0,0 +1,14 @@
import os
def rm(p: str, cwd: str = "/"):
path = cwd+p
print(f"rm {path=}")
try:
os.unlink(path)
except OSError:
for f in os.listdir(path): # pyright: ignore[reportAny]
rm(f, path+"/") # pyright: ignore[reportAny]
os.unlink(path)
for f in os.listdir(): # pyright: ignore[reportAny]
rm(f) # pyright: ignore[reportAny]

46
pico/src/connection.py Normal file
View File

@ -0,0 +1,46 @@
import network
import asyncio
from log import Logger, Color
from statusled import STATUS_LED
async def ensure_network():
from secret import SSID, PASSWORD
log = Logger("net", Color.BLUE)
nic = network.WLAN(network.STA_IF)
log.info("setting up WLAN interface...")
nic.active(True)
while not nic.active():
await asyncio.sleep(0.1)
log.info("connecting...")
nic.connect(SSID, PASSWORD)
logged = False
status_name = {
network.STAT_CONNECT_FAIL: "CONNECT_FAIL",
network.STAT_CONNECTING: "CONNECTING",
network.STAT_IDLE: "IDLE",
network.STAT_NO_AP_FOUND: "NO_AP_FOUND",
network.STAT_WRONG_PASSWORD: "WRONG_PASSWORD"
}
while True:
if (status := nic.status()) == network.STAT_GOT_IP: # pyright: ignore[reportAny]
STATUS_LED.status(2)
if not logged:
logged = True
log.success(f"Got IP: {nic.ifconfig()!r}")
rssi = nic.status("rssi") # pyright: ignore[reportAny]
log.info(f"{rssi=}")
else:
logged = False
STATUS_LED.status(1)
log.warning("status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny]
if status == network.STAT_CONNECT_FAIL or status == network.STAT_NO_AP_FOUND:
log.info("retrying in 3s...")
await asyncio.sleep(3)
nic.connect(SSID, PASSWORD)
await asyncio.sleep(1)

45
pico/src/log.py Normal file
View File

@ -0,0 +1,45 @@
def _color(n: int):
return f"\033[{n}m"
class Color:
RED: str = _color(31)
GREEN: str = _color(32)
YELLOW: str = _color(33)
BLUE: str = _color(34)
MAGENTA: str = _color(35)
CYAN: str = _color(36)
RESET: str = _color(0)
class Logger:
tag: str
color: str | None
def __init__(self, tag: str, color: str | None = None):
self.tag = tag
self.color = color
def info(self, message: str):
self._impl("*", None, message)
def success(self, message: str):
self._impl("+", Color.GREEN, message)
def warning(self, message: str):
self._impl("w", Color.YELLOW, message)
def error(self, message: str):
self._impl("!", Color.RED, message)
def _impl(self, symbol: str, color: str | None, message: str):
prefix_len = len(symbol) + len(self.tag) + 3 # "[t|s]"
tag = f"{self.color}{self.tag}{Color.RESET}" if self.color else self.tag
symbol = f"{color}{symbol}{Color.RESET}" if color else symbol
prefix = f"[{tag}|{symbol}]"
padding = prefix_len * " "
first = True
for line in message.splitlines():
print(prefix if first else padding, line)
first = False

View File

@ -1,118 +1,39 @@
from machine import Pin
import machine
import network
import time
import asyncio
class StatusLED:
led: Pin
_status: int
_period: float
_duty_cycle: float
from vendor.microdot import Microdot, Request
def __init__(self):
self.led = Pin("LED", Pin.OUT)
self._status = 1
self._period = 1
self._duty_cycle = 0.5
from statusled import STATUS_LED
from log import Logger, Color
from sensors.ultrasonic import UltraSonicSensor
from connection import ensure_network
def status(self, status: int):
self._status = status
async def task(self):
while True:
t_1 = self._duty_cycle * self._period
t_2 = (1 - self._duty_cycle) * self._period
app = Microdot()
t_blink = t_1 / self._status / 2
for _ in range(self._status):
self.led.value(1)
await asyncio.sleep(t_blink)
self.led.value(0)
await asyncio.sleep(t_blink)
await asyncio.sleep(t_2)
@app.route("/")
async def index(_request: Request):
return "Hello from pico"
STATUS_LED = StatusLED()
class UltraSonicSensor:
"""
See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/
"""
tx: Pin
rx: Pin
def __init__(self, tx: int, rx: int):
self.tx = Pin(tx, Pin.OUT)
self.rx = Pin(rx, Pin.IN)
def query_mm(self):
self.tx.value(0)
time.sleep_us(5)
self.tx.value(1)
time.sleep_us(10)
self.tx.value(0)
pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30)
if pulse_time < 0:
return None
return pulse_time * 100 // 582
async def ensure_network():
from secret import SSID, PASSWORD
nic = network.WLAN(network.STA_IF)
print("[net] setting up WLAN interface...")
nic.active(True)
while not nic.active():
await asyncio.sleep(0.1)
print("[net] connecting...")
nic.connect(SSID, PASSWORD)
logged = False
status_name = {
network.STAT_CONNECT_FAIL: "CONNECT_FAIL",
network.STAT_CONNECTING: "CONNECTING",
network.STAT_IDLE: "IDLE",
network.STAT_NO_AP_FOUND: "NO_AP_FOUND",
network.STAT_WRONG_PASSWORD: "WRONG_PASSWORD"
}
while True:
if (status := nic.status()) == network.STAT_GOT_IP: # pyright: ignore[reportAny]
STATUS_LED.status(2)
if not logged:
logged = True
print(f"[net] Got IP: {nic.ifconfig()!r}")
rssi = nic.status("rssi") # pyright: ignore[reportAny]
print(f"[net] {rssi=}")
else:
STATUS_LED.status(1)
print("[net] status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny]
if status == network.STAT_CONNECT_FAIL or status == network.STAT_NO_AP_FOUND:
print("[net] retrying in 3s...")
await asyncio.sleep(3)
nic.connect(SSID, PASSWORD)
await asyncio.sleep(1)
async def _main():
asyncio.create_task(STATUS_LED.task())
asyncio.create_task(ensure_network())
async def read_sensor():
log = Logger("tank", Color.CYAN)
sensor = UltraSonicSensor(22, 21)
while True:
d = sensor.query_mm()
print(f"d: {d} mm")
if d is None:
log.warning("distance unreadable!")
else:
log.info(f"distance: {d}mm")
await asyncio.sleep(0.5)
async def _main():
await asyncio.gather(
STATUS_LED.task(),
ensure_network(),
read_sensor(),
app.start_server(port=80)
)
def main():
asyncio.run(_main())

View File

@ -0,0 +1,28 @@
import machine
import time
class UltraSonicSensor:
"""
See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/
"""
tx: machine.Pin
rx: machine.Pin
def __init__(self, tx: int, rx: int):
self.tx = machine.Pin(tx, machine.Pin.OUT)
self.rx = machine.Pin(rx, machine.Pin.IN)
def query_mm(self):
self.tx.value(0)
time.sleep_us(5)
self.tx.value(1)
time.sleep_us(10)
self.tx.value(0)
pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30)
if pulse_time < 0:
return None
return pulse_time * 100 // 582

34
pico/src/statusled.py Normal file
View File

@ -0,0 +1,34 @@
from machine import Pin
import asyncio
class StatusLED:
led: Pin
_status: int
_period: float
_duty_cycle: float
def __init__(self):
self.led = Pin("LED", Pin.OUT)
self._status = 1
self._period = 1
self._duty_cycle = 0.5
def status(self, status: int):
self._status = status
async def task(self):
while True:
t_1 = self._duty_cycle * self._period
t_2 = (1 - self._duty_cycle) * self._period
t_blink = t_1 / self._status / 2
for _ in range(self._status):
self.led.value(1)
await asyncio.sleep(t_blink)
self.led.value(0)
await asyncio.sleep(t_blink)
await asyncio.sleep(t_2)
STATUS_LED = StatusLED()

1532
pico/src/vendor/microdot.py vendored Normal file

File diff suppressed because it is too large Load Diff