Implement monitoring infrastructure

This commit is contained in:
2022-09-22 19:27:01 +02:00
parent 2af0136703
commit 7fa635170a
15 changed files with 499 additions and 29 deletions

96
server/dummy-client.py Normal file
View File

@@ -0,0 +1,96 @@
import asyncio
import json
import websockets
ENDPOINT = "ws://localhost:8000/ipmi/computer/8b9faf9f-9470-4a50-b405-0af5f0152550/ws"
def gen_payload(tick: int):
return {
"x": 3,
"y": 4,
"width": 39,
"height": 13,
"blink": True,
"fg": 0,
"text": [
"[WS] OK\u0003 ",
"FG 0123456789ABCDEF ",
"BG 0123456789ABCDEF ",
" ",
f"Tick: {tick:8d} ",
" ",
" ",
" ",
" ",
" ",
" ",
" ",
" ",
],
"fg_color": [
"78870dd50000000000000000000000000000000",
"0000123456789abcdef00000000000000000000",
"000f00000000000000000000000000000000000",
"440000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
"000000000000000000000000000000000000000",
],
"bg_color": [
"fffffffffffffffffffffffffffffffffffffff",
"ffffffffffffffffff0ffffffffffffffffffff",
"fff0123456789abcdefffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
"fffffffffffffffffffffffffffffffffffffff",
],
"palette": [
15790320,
15905331,
15040472,
10072818,
14605932,
8375321,
15905484,
5000268,
10066329,
5020082,
11691749,
3368652,
8349260,
5744206,
13388876,
1118481,
],
}
async def main():
tick = 0
async with websockets.connect(ENDPOINT) as socket:
while True:
await socket.send(json.dumps({"screen": gen_payload(tick)}))
await asyncio.sleep(1 / 20)
tick += 1
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("this handler gets it")

View File

@@ -1,3 +1,4 @@
import asyncio
import json
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
@@ -6,14 +7,20 @@ from .settings import settings
from .user import user_auth
from .map_tiles import map_tiles, map_meta
from .templates import j2env
from .monitoring import monitoring
from .monitoring import monitoring, ws_manager
app = FastAPI()
app.mount("/user/", user_auth)
app.mount("/map/", map_meta)
app.mount("/tiles/", map_tiles)
app.mount("/monitoring/", monitoring)
app.mount("/ipmi/", monitoring)
@app.on_event("startup")
async def on_startup():
asyncio.get_running_loop().create_task(ws_manager.queue_task())
frontend = FastAPI()

View File

@@ -1,10 +1,13 @@
import asyncio
import json
from uuid import UUID
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
monitoring = FastAPI()
class ScreenContent(BaseModel):
x: int
y: int
@@ -15,24 +18,86 @@ class ScreenContent(BaseModel):
text: list[str]
fg_color: list[str]
bg_color: list[str]
palette: list[int]
class Ping(BaseModel):
screen: ScreenContent
@monitoring.post("/ping")
async def ping(request: Request, data: Ping):
print("[PING]")
for line in data.screen.text:
print(line)
class Update(BaseModel):
screen: ScreenContent | None
class WSManager:
def __init__(self):
self.computers: dict[UUID, WebSocket] = dict()
self.viewers: dict[UUID, set[WebSocket]] = dict()
self.queue: asyncio.Queue[tuple[UUID, any]] = asyncio.Queue()
async def queue_task(self):
print("[WS] queue task started")
while True:
(uuid, message) = await self.queue.get()
if uuid not in self.viewers:
continue
viewers = self.viewers[uuid]
await asyncio.gather(*(viewer.send_json(message) for viewer in viewers))
async def broadcast(self, uuid: UUID, message):
await self.queue.put((uuid, message))
async def on_computer_connect(self, socket: WebSocket, uuid: UUID):
if uuid in self.computers:
print(f"[WS] Closing duplicate connection for {uuid}")
await socket.close()
return
print(f"[WS] Computer {uuid} connected")
self.computers[uuid] = socket
while True:
try:
data = await socket.receive_json()
data = Update.parse_obj(data)
if data.screen:
await self.broadcast(uuid, data.screen.dict())
except ValidationError as e:
print(f"[WS] Received invalid message from {uuid}:")
print(e.json)
except WebSocketDisconnect:
break
del self.computers[uuid]
print(f"[WS] Computer {uuid} disconnected")
async def on_browser_connect(self, socket: WebSocket, uuid: UUID):
print(f"[WS] Browser connected for {uuid}")
if uuid not in self.viewers:
self.viewers[uuid] = set()
self.viewers[uuid].add(socket)
while True:
try:
data = await socket.receive_json()
except WebSocketDisconnect:
break
self.viewers[uuid].remove(socket)
print(f"[WS] Browser disconnected for {uuid}")
ws_manager = WSManager()
@monitoring.websocket("/computer/{uuid}/ws")
async def computer_ws(socket: WebSocket, uuid: UUID):
await socket.accept()
print(f"[WS] Computer {uuid} connected")
while True:
try:
data = await socket.receive_json()
#print(f"[WS] rx {json.dumps(data)}")
except WebSocketDisconnect:
break
print(f"[WS] Computer {uuid} disconnected")
await ws_manager.on_computer_connect(socket, uuid)
@monitoring.websocket("/browser/{uuid}/ws")
async def browser_ws(socket: WebSocket, uuid: UUID):
await socket.accept()
await ws_manager.on_browser_connect(socket, uuid)