import asyncio from typing import * from queue import Queue from threading import Thread, Lock import time from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel, Field, ValidationError import serial app = FastAPI() class Slider: def __init__(self): self.value = 0 self.owner: Optional[WebSocket] = None self.release_timer: Optional[asyncio.Task] = None self.release_event = asyncio.Event() def cancel_release_timer(self): if self.release_timer is not None: self.release_timer.cancel() self.release_timer = None def reset_release_timer(self): self.cancel_release_timer() self.release_timer = asyncio.create_task(self._release_timer()) async def _release_timer(self): await asyncio.sleep(1) self.release_event.set() dmx_state = [Slider() for _ in range(8)] class GrabAction(BaseModel): action_type: Literal["grab"] slider: int class ReleaseAction(BaseModel): action_type: Literal["release"] slider: int class MoveAction(BaseModel): action_type: Literal["move"] slider: int new_value: int class ClientAction(BaseModel): action: Union[GrabAction, ReleaseAction, MoveAction] = Field( ..., discriminator="action_type" ) class SocketManager: def __init__(self): self.sockets = set() async def on_connect(self, ws: WebSocket): self.sockets.add(ws) await self.push_state(ws) def on_disconnect(self, ws: WebSocket): self.sockets.remove(ws) for slider in dmx_state: if slider.owner == ws: slider.owner = None async def on_action( self, ws: WebSocket, action: Union[GrabAction, ReleaseAction, MoveAction] ): slider = dmx_state[action.slider] if action.action_type == "grab": print(f"grab {action.slider}") if slider.owner is None: slider.owner = ws slider.reset_release_timer() elif action.action_type == "release": print(f"release {action.slider}") if slider.owner == ws: slider.owner = None slider.cancel_release_timer() elif action.action_type == "move": print(f"move {action.slider} -> {action.new_value}") if slider.owner == ws: slider.value = action.new_value slider.reset_release_timer() await self.push_all() async def push_state(self, ws: WebSocket): response = [] for slider in dmx_state: value = slider.value if slider.owner == ws: status = "owned" elif slider.owner is not None: status = "locked" else: status = "open" response.append({"value": value, "status": status}) await ws.send_json(response) async def push_all(self): await asyncio.gather(*[self.push_state(ws) for ws in self.sockets]) async def watch_auto_release(self): async def _watch(slider): while True: await slider.release_event.wait() print("resetteroni") slider.release_event.clear() slider.owner = slider.release_timer = None await self.push_all() await asyncio.gather(*[_watch(slider) for slider in dmx_state]) socket_manager = SocketManager() @app.websocket("/ws") async def ws_handler(ws: WebSocket): await ws.accept() await socket_manager.on_connect(ws) try: while True: data = await ws.receive_json() try: action = ClientAction.parse_obj(data) await socket_manager.on_action(ws, action.action) except ValidationError as e: print(e) except WebSocketDisconnect as e: pass finally: socket_manager.on_disconnect(ws) app.mount("/", StaticFiles(directory="frontend", html=True)) dmx_data_lock = Lock() dmx_data = [0 for _ in range(len(dmx_state))] async def dmx_watcher(): while True: with dmx_data_lock: for (i, slider) in enumerate(dmx_state): dmx_data[i] = slider.value await asyncio.sleep(1/50) class DmxWriter(Thread): def __init__(self): super().__init__() self.running = True def run(self): FPS = 50 FRAME_TIME = 1 / FPS with serial.Serial("/dev/ttyUSB0", 500_000) as ser: payload = bytearray(512) def sync(): # wait for sync while True: b = ser.readline() if b.strip() == b"Sync.": return sync() print("initial sync.") while self.running: loop_start = time.time() with dmx_data_lock: for (i, value) in enumerate(dmx_data): payload[i] = value ser.write(payload) ser.flush() response = ser.readline() if response.strip() != b"Ack.": print(f"received bad response: {response!r}") sync() continue loop_time = time.time() - loop_start if loop_time < FRAME_TIME: time.sleep(FRAME_TIME - loop_time) else: print("loop took too long!") print(f"loop time: {1000 * loop_time:0.2f}ms busy, {1000 * (time.time() - loop_start):0.2f}ms total") def stop(self): self.running = False dmx_writer = DmxWriter() @app.on_event("startup") async def on_startup(): asyncio.create_task(socket_manager.watch_auto_release()) asyncio.create_task(dmx_watcher()) dmx_writer.start() @app.on_event("shutdown") async def on_shutdown(): print("shutdown") dmx_writer.stop() dmx_writer.join()