import asyncio from typing import * from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel, Field, ValidationError app = FastAPI() class Slider: def __init__(self): self.value = 0 self.owner: Optional[WebSocket] = None self.release_timer: Optional[asyncio.Task] = None self.release_event = asyncio.Event() def cancel_release_timer(self): if self.release_timer is not None: self.release_timer.cancel() self.release_timer = None def reset_release_timer(self): self.cancel_release_timer() self.release_timer = asyncio.create_task(self._release_timer()) async def _release_timer(self): await asyncio.sleep(1) self.release_event.set() dmx_state = [Slider() for _ in range(8)] class GrabAction(BaseModel): action_type: Literal["grab"] slider: int class ReleaseAction(BaseModel): action_type: Literal["release"] slider: int class MoveAction(BaseModel): action_type: Literal["move"] slider: int new_value: int class ClientAction(BaseModel): action: Union[GrabAction, ReleaseAction, MoveAction] = Field( ..., discriminator="action_type" ) class SocketManager: def __init__(self): self.sockets = set() async def on_connect(self, ws: WebSocket): self.sockets.add(ws) await self.push_state(ws) def on_disconnect(self, ws: WebSocket): self.sockets.remove(ws) for slider in dmx_state: if slider.owner == ws: slider.owner = None async def on_action( self, ws: WebSocket, action: Union[GrabAction, ReleaseAction, MoveAction] ): slider = dmx_state[action.slider] if action.action_type == "grab": print(f"grab {action.slider}") if slider.owner is None: slider.owner = ws slider.reset_release_timer() elif action.action_type == "release": print(f"release {action.slider}") if slider.owner == ws: slider.owner = None slider.cancel_release_timer() elif action.action_type == "move": print(f"move {action.slider} -> {action.new_value}") if slider.owner == ws: slider.value = action.new_value slider.reset_release_timer() await self.push_all() async def push_state(self, ws: WebSocket): response = [] for slider in dmx_state: value = slider.value if slider.owner == ws: status = "owned" elif slider.owner is not None: status = "locked" else: status = "open" response.append({"value": value, "status": status}) await ws.send_json(response) async def push_all(self): await asyncio.gather(*[self.push_state(ws) for ws in self.sockets]) async def watch_auto_release(self): async def _watch(slider): while True: await slider.release_event.wait() print("resetteroni") slider.release_event.clear() slider.owner = slider.release_timer = None await self.push_all() await asyncio.gather(*[_watch(slider) for slider in dmx_state]) socket_manager = SocketManager() @app.websocket("/ws") async def ws_handler(ws: WebSocket): await ws.accept() await socket_manager.on_connect(ws) try: while True: data = await ws.receive_json() try: action = ClientAction.parse_obj(data) await socket_manager.on_action(ws, action.action) except ValidationError as e: print(e) except WebSocketDisconnect as e: pass finally: socket_manager.on_disconnect(ws) app.mount("/", StaticFiles(directory="frontend", html=True)) @app.on_event("startup") async def on_startup(): asyncio.create_task(socket_manager.watch_auto_release())