Implement DMX serial output

This commit is contained in:
Kai Vogelgesang 2022-03-22 21:35:16 +01:00
parent 62636fa2f9
commit 6f6e0fd665
Signed by: kai
GPG Key ID: 0A95D3B6E62C0879

View File

@ -1,5 +1,8 @@
import asyncio import asyncio
from typing import * from typing import *
from queue import Queue
from threading import Thread, Lock
import time
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
@ -7,6 +10,7 @@ from fastapi.responses import FileResponse
from pydantic import BaseModel, Field, ValidationError from pydantic import BaseModel, Field, ValidationError
import serial
app = FastAPI() app = FastAPI()
@ -156,7 +160,80 @@ async def ws_handler(ws: WebSocket):
app.mount("/", StaticFiles(directory="frontend", html=True)) app.mount("/", StaticFiles(directory="frontend", html=True))
dmx_data_lock = Lock()
dmx_data = [0 for _ in range(len(dmx_state))]
async def dmx_watcher():
while True:
with dmx_data_lock:
for (i, slider) in enumerate(dmx_state):
dmx_data[i] = slider.value
await asyncio.sleep(1/50)
class DmxWriter(Thread):
def __init__(self):
super().__init__()
self.running = True
def run(self):
FPS = 50
FRAME_TIME = 1 / FPS
with serial.Serial("/dev/ttyUSB0", 500_000) as ser:
payload = bytearray(512)
def sync():
# wait for sync
while True:
b = ser.readline()
if b.strip() == b"Sync.":
return
sync()
print("initial sync.")
while self.running:
loop_start = time.time()
with dmx_data_lock:
for (i, value) in enumerate(dmx_data):
payload[i] = value
ser.write(payload)
ser.flush()
response = ser.readline()
if response.strip() != b"Ack.":
print(f"received bad response: {response!r}")
sync()
continue
loop_time = time.time() - loop_start
if loop_time < FRAME_TIME:
time.sleep(FRAME_TIME - loop_time)
else:
print("loop took too long!")
print(f"loop time: {1000 * loop_time:0.2f}ms busy, {1000 * (time.time() - loop_start):0.2f}ms total")
def stop(self):
self.running = False
dmx_writer = DmxWriter()
@app.on_event("startup") @app.on_event("startup")
async def on_startup(): async def on_startup():
asyncio.create_task(socket_manager.watch_auto_release()) asyncio.create_task(socket_manager.watch_auto_release())
asyncio.create_task(dmx_watcher())
dmx_writer.start()
@app.on_event("shutdown")
async def on_shutdown():
print("shutdown")
dmx_writer.stop()
dmx_writer.join()