Compare commits
4 Commits
1132335ea8
...
c37d349608
Author | SHA1 | Date | |
---|---|---|---|
c37d349608 | |||
0365b4c3e2 | |||
d1199575ff | |||
6aa2d756c6 |
1
pico/.gitignore
vendored
1
pico/.gitignore
vendored
@ -1,3 +1,4 @@
|
|||||||
|
build
|
||||||
.venv
|
.venv
|
||||||
typings
|
typings
|
||||||
src/secret.py
|
src/secret.py
|
@ -1,6 +1,27 @@
|
|||||||
|
SRC_DIR := src
|
||||||
|
BUILD_DIR := build
|
||||||
|
|
||||||
|
SRC_FILES := $(wildcard $(SRC_DIR)/*.py) $(wildcard $(SRC_DIR)/**/*.py)
|
||||||
|
BUILD_FILES := $(patsubst $(SRC_DIR)/%.py,$(BUILD_DIR)/%.mpy, $(SRC_FILES))
|
||||||
|
|
||||||
|
$(BUILD_DIR)/%.mpy: $(SRC_DIR)/%.py
|
||||||
|
@mkdir -p `dirname $@`
|
||||||
|
mpy-cross $< -o $@
|
||||||
|
|
||||||
|
.PHONY: build
|
||||||
|
build: $(BUILD_FILES)
|
||||||
|
|
||||||
|
.PHONY: clean
|
||||||
|
clean:
|
||||||
|
rm -rf $(BUILD_DIR)
|
||||||
|
|
||||||
.PHONY: deploy
|
.PHONY: deploy
|
||||||
deploy:
|
deploy: $(BUILD_FILES)
|
||||||
mpremote fs cp src/* :
|
mpremote fs cp -r $(BUILD_DIR)/* :
|
||||||
|
|
||||||
|
.PHONY: purge
|
||||||
|
purge:
|
||||||
|
mpremote run purge.py
|
||||||
|
|
||||||
.PHONY: run
|
.PHONY: run
|
||||||
run: deploy
|
run: deploy
|
||||||
|
@ -1,6 +1,20 @@
|
|||||||
# Setup:
|
# Setup:
|
||||||
|
1. Set up the virtual environment:
|
||||||
```
|
```
|
||||||
$ python -m venv .venv
|
$ python -m venv .venv
|
||||||
$ source .venv/bin/activate
|
$ source .venv/bin/activate
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Install the type stubs:
|
||||||
|
```
|
||||||
$ pip install -U micropython-rp2-pico_w-stubs --no-user --target typings
|
$ pip install -U micropython-rp2-pico_w-stubs --no-user --target typings
|
||||||
```
|
```
|
||||||
|
|
||||||
|
3. Ensure `mpy-cross` is available:
|
||||||
|
```
|
||||||
|
$ git clone https://github.com/micropython/micropython.git /tmp/micropython
|
||||||
|
$ pushd /tmp/micropython/mpy-cross
|
||||||
|
$ make
|
||||||
|
$ mv build/mpy-cross ~/.local/bin/
|
||||||
|
$ popd
|
||||||
|
```
|
14
pico/purge.py
Normal file
14
pico/purge.py
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
def rm(p: str, cwd: str = "/"):
|
||||||
|
path = cwd+p
|
||||||
|
print(f"rm {path=}")
|
||||||
|
try:
|
||||||
|
os.unlink(path)
|
||||||
|
except OSError:
|
||||||
|
for f in os.listdir(path): # pyright: ignore[reportAny]
|
||||||
|
rm(f, path+"/") # pyright: ignore[reportAny]
|
||||||
|
os.unlink(path)
|
||||||
|
|
||||||
|
for f in os.listdir(): # pyright: ignore[reportAny]
|
||||||
|
rm(f) # pyright: ignore[reportAny]
|
46
pico/src/connection.py
Normal file
46
pico/src/connection.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import network
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from log import Logger, Color
|
||||||
|
from statusled import STATUS_LED
|
||||||
|
|
||||||
|
async def ensure_network():
|
||||||
|
from secret import SSID, PASSWORD
|
||||||
|
log = Logger("net", Color.BLUE)
|
||||||
|
|
||||||
|
nic = network.WLAN(network.STA_IF)
|
||||||
|
log.info("setting up WLAN interface...")
|
||||||
|
nic.active(True)
|
||||||
|
while not nic.active():
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
log.info("connecting...")
|
||||||
|
nic.connect(SSID, PASSWORD)
|
||||||
|
|
||||||
|
logged = False
|
||||||
|
|
||||||
|
status_name = {
|
||||||
|
network.STAT_CONNECT_FAIL: "CONNECT_FAIL",
|
||||||
|
network.STAT_CONNECTING: "CONNECTING",
|
||||||
|
network.STAT_IDLE: "IDLE",
|
||||||
|
network.STAT_NO_AP_FOUND: "NO_AP_FOUND",
|
||||||
|
network.STAT_WRONG_PASSWORD: "WRONG_PASSWORD"
|
||||||
|
}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if (status := nic.status()) == network.STAT_GOT_IP: # pyright: ignore[reportAny]
|
||||||
|
STATUS_LED.status(2)
|
||||||
|
if not logged:
|
||||||
|
logged = True
|
||||||
|
log.success(f"Got IP: {nic.ifconfig()!r}")
|
||||||
|
rssi = nic.status("rssi") # pyright: ignore[reportAny]
|
||||||
|
log.info(f"{rssi=}")
|
||||||
|
else:
|
||||||
|
logged = False
|
||||||
|
STATUS_LED.status(1)
|
||||||
|
log.warning("status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny]
|
||||||
|
if status == network.STAT_CONNECT_FAIL or status == network.STAT_NO_AP_FOUND:
|
||||||
|
log.info("retrying in 3s...")
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
nic.connect(SSID, PASSWORD)
|
||||||
|
|
||||||
|
await asyncio.sleep(1)
|
45
pico/src/log.py
Normal file
45
pico/src/log.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
def _color(n: int):
|
||||||
|
return f"\033[{n}m"
|
||||||
|
|
||||||
|
class Color:
|
||||||
|
RED: str = _color(31)
|
||||||
|
GREEN: str = _color(32)
|
||||||
|
YELLOW: str = _color(33)
|
||||||
|
BLUE: str = _color(34)
|
||||||
|
MAGENTA: str = _color(35)
|
||||||
|
CYAN: str = _color(36)
|
||||||
|
RESET: str = _color(0)
|
||||||
|
|
||||||
|
|
||||||
|
class Logger:
|
||||||
|
tag: str
|
||||||
|
color: str | None
|
||||||
|
|
||||||
|
def __init__(self, tag: str, color: str | None = None):
|
||||||
|
self.tag = tag
|
||||||
|
self.color = color
|
||||||
|
|
||||||
|
def info(self, message: str):
|
||||||
|
self._impl("*", None, message)
|
||||||
|
|
||||||
|
def success(self, message: str):
|
||||||
|
self._impl("+", Color.GREEN, message)
|
||||||
|
|
||||||
|
def warning(self, message: str):
|
||||||
|
self._impl("w", Color.YELLOW, message)
|
||||||
|
|
||||||
|
def error(self, message: str):
|
||||||
|
self._impl("!", Color.RED, message)
|
||||||
|
|
||||||
|
def _impl(self, symbol: str, color: str | None, message: str):
|
||||||
|
prefix_len = len(symbol) + len(self.tag) + 3 # "[t|s]"
|
||||||
|
|
||||||
|
tag = f"{self.color}{self.tag}{Color.RESET}" if self.color else self.tag
|
||||||
|
symbol = f"{color}{symbol}{Color.RESET}" if color else symbol
|
||||||
|
prefix = f"[{tag}|{symbol}]"
|
||||||
|
padding = prefix_len * " "
|
||||||
|
|
||||||
|
first = True
|
||||||
|
for line in message.splitlines():
|
||||||
|
print(prefix if first else padding, line)
|
||||||
|
first = False
|
129
pico/src/main.py
129
pico/src/main.py
@ -1,118 +1,39 @@
|
|||||||
from machine import Pin
|
|
||||||
import machine
|
|
||||||
import network
|
|
||||||
import time
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
class StatusLED:
|
from vendor.microdot import Microdot, Request
|
||||||
led: Pin
|
|
||||||
_status: int
|
|
||||||
_period: float
|
|
||||||
_duty_cycle: float
|
|
||||||
|
|
||||||
def __init__(self):
|
from statusled import STATUS_LED
|
||||||
self.led = Pin("LED", Pin.OUT)
|
from log import Logger, Color
|
||||||
self._status = 1
|
from sensors.ultrasonic import UltraSonicSensor
|
||||||
self._period = 1
|
from connection import ensure_network
|
||||||
self._duty_cycle = 0.5
|
|
||||||
|
|
||||||
def status(self, status: int):
|
app = Microdot()
|
||||||
self._status = status
|
|
||||||
|
|
||||||
|
@app.route("/")
|
||||||
|
async def index(_request: Request):
|
||||||
|
return "Hello from pico"
|
||||||
|
|
||||||
async def task(self):
|
async def read_sensor():
|
||||||
while True:
|
log = Logger("tank", Color.CYAN)
|
||||||
t_1 = self._duty_cycle * self._period
|
|
||||||
t_2 = (1 - self._duty_cycle) * self._period
|
|
||||||
|
|
||||||
t_blink = t_1 / self._status / 2
|
|
||||||
for _ in range(self._status):
|
|
||||||
self.led.value(1)
|
|
||||||
await asyncio.sleep(t_blink)
|
|
||||||
self.led.value(0)
|
|
||||||
await asyncio.sleep(t_blink)
|
|
||||||
|
|
||||||
await asyncio.sleep(t_2)
|
|
||||||
|
|
||||||
STATUS_LED = StatusLED()
|
|
||||||
|
|
||||||
|
|
||||||
class UltraSonicSensor:
|
|
||||||
"""
|
|
||||||
See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/
|
|
||||||
"""
|
|
||||||
|
|
||||||
tx: Pin
|
|
||||||
rx: Pin
|
|
||||||
|
|
||||||
def __init__(self, tx: int, rx: int):
|
|
||||||
self.tx = Pin(tx, Pin.OUT)
|
|
||||||
self.rx = Pin(rx, Pin.IN)
|
|
||||||
|
|
||||||
def query_mm(self):
|
|
||||||
self.tx.value(0)
|
|
||||||
time.sleep_us(5)
|
|
||||||
self.tx.value(1)
|
|
||||||
time.sleep_us(10)
|
|
||||||
self.tx.value(0)
|
|
||||||
|
|
||||||
pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30)
|
|
||||||
if pulse_time < 0:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return pulse_time * 100 // 582
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def ensure_network():
|
|
||||||
from secret import SSID, PASSWORD
|
|
||||||
|
|
||||||
nic = network.WLAN(network.STA_IF)
|
|
||||||
print("[net] setting up WLAN interface...")
|
|
||||||
nic.active(True)
|
|
||||||
while not nic.active():
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
print("[net] connecting...")
|
|
||||||
nic.connect(SSID, PASSWORD)
|
|
||||||
|
|
||||||
logged = False
|
|
||||||
|
|
||||||
status_name = {
|
|
||||||
network.STAT_CONNECT_FAIL: "CONNECT_FAIL",
|
|
||||||
network.STAT_CONNECTING: "CONNECTING",
|
|
||||||
network.STAT_IDLE: "IDLE",
|
|
||||||
network.STAT_NO_AP_FOUND: "NO_AP_FOUND",
|
|
||||||
network.STAT_WRONG_PASSWORD: "WRONG_PASSWORD"
|
|
||||||
}
|
|
||||||
|
|
||||||
while True:
|
|
||||||
if (status := nic.status()) == network.STAT_GOT_IP: # pyright: ignore[reportAny]
|
|
||||||
STATUS_LED.status(2)
|
|
||||||
if not logged:
|
|
||||||
logged = True
|
|
||||||
print(f"[net] Got IP: {nic.ifconfig()!r}")
|
|
||||||
rssi = nic.status("rssi") # pyright: ignore[reportAny]
|
|
||||||
print(f"[net] {rssi=}")
|
|
||||||
else:
|
|
||||||
STATUS_LED.status(1)
|
|
||||||
print("[net] status={}".format(status_name.get(status, f"UNKNOWN ({status})"))) # pyright: ignore[reportAny]
|
|
||||||
if status == network.STAT_CONNECT_FAIL or status == network.STAT_NO_AP_FOUND:
|
|
||||||
print("[net] retrying in 3s...")
|
|
||||||
await asyncio.sleep(3)
|
|
||||||
nic.connect(SSID, PASSWORD)
|
|
||||||
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
|
|
||||||
|
|
||||||
async def _main():
|
|
||||||
asyncio.create_task(STATUS_LED.task())
|
|
||||||
asyncio.create_task(ensure_network())
|
|
||||||
sensor = UltraSonicSensor(22, 21)
|
sensor = UltraSonicSensor(22, 21)
|
||||||
while True:
|
while True:
|
||||||
d = sensor.query_mm()
|
d = sensor.query_mm()
|
||||||
print(f"d: {d} mm")
|
if d is None:
|
||||||
|
log.warning("distance unreadable!")
|
||||||
|
else:
|
||||||
|
log.info(f"distance: {d}mm")
|
||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
async def _main():
|
||||||
|
await asyncio.gather(
|
||||||
|
STATUS_LED.task(),
|
||||||
|
ensure_network(),
|
||||||
|
read_sensor(),
|
||||||
|
app.start_server(port=80)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
asyncio.run(_main())
|
asyncio.run(_main())
|
||||||
|
|
||||||
|
28
pico/src/sensors/ultrasonic.py
Normal file
28
pico/src/sensors/ultrasonic.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import machine
|
||||||
|
import time
|
||||||
|
|
||||||
|
class UltraSonicSensor:
|
||||||
|
"""
|
||||||
|
See https://randomnerdtutorials.com/micropython-hc-sr04-ultrasonic-esp32-esp8266/
|
||||||
|
"""
|
||||||
|
|
||||||
|
tx: machine.Pin
|
||||||
|
rx: machine.Pin
|
||||||
|
|
||||||
|
def __init__(self, tx: int, rx: int):
|
||||||
|
self.tx = machine.Pin(tx, machine.Pin.OUT)
|
||||||
|
self.rx = machine.Pin(rx, machine.Pin.IN)
|
||||||
|
|
||||||
|
def query_mm(self):
|
||||||
|
self.tx.value(0)
|
||||||
|
time.sleep_us(5)
|
||||||
|
self.tx.value(1)
|
||||||
|
time.sleep_us(10)
|
||||||
|
self.tx.value(0)
|
||||||
|
|
||||||
|
pulse_time = machine.time_pulse_us(self.rx, 1, 500*2*30)
|
||||||
|
if pulse_time < 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return pulse_time * 100 // 582
|
||||||
|
|
34
pico/src/statusled.py
Normal file
34
pico/src/statusled.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
from machine import Pin
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
class StatusLED:
|
||||||
|
led: Pin
|
||||||
|
_status: int
|
||||||
|
_period: float
|
||||||
|
_duty_cycle: float
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.led = Pin("LED", Pin.OUT)
|
||||||
|
self._status = 1
|
||||||
|
self._period = 1
|
||||||
|
self._duty_cycle = 0.5
|
||||||
|
|
||||||
|
def status(self, status: int):
|
||||||
|
self._status = status
|
||||||
|
|
||||||
|
|
||||||
|
async def task(self):
|
||||||
|
while True:
|
||||||
|
t_1 = self._duty_cycle * self._period
|
||||||
|
t_2 = (1 - self._duty_cycle) * self._period
|
||||||
|
|
||||||
|
t_blink = t_1 / self._status / 2
|
||||||
|
for _ in range(self._status):
|
||||||
|
self.led.value(1)
|
||||||
|
await asyncio.sleep(t_blink)
|
||||||
|
self.led.value(0)
|
||||||
|
await asyncio.sleep(t_blink)
|
||||||
|
|
||||||
|
await asyncio.sleep(t_2)
|
||||||
|
|
||||||
|
STATUS_LED = StatusLED()
|
1532
pico/src/vendor/microdot.py
vendored
Normal file
1532
pico/src/vendor/microdot.py
vendored
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user