import random import base64 import json import os import datetime def generate_random_id(_s=set()): while (new_id := base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(10)))[:16].decode().lower()) in _s: pass _s.add(new_id) return new_id class Model(object): class ApiMethod: dict = dict() def __init__(self, fun): self.dict[fun.__name__] = fun def __contains__(self, item): return item in self.dict def __init__(self, model = {}): if "sessions" not in model: model["sessions"] = [] if "clients" not in model: model["clients"] = [] if "items" not in model: model["items"] = [] self.sockets = {} # mapping: clientid -> sockets self.sessions = { session: Session(model = model["sessions"][session]) for session in model["sessions"] } self.clients = { client: Client(model = model["clients"][client]) for client in model["clients"] } self.items = { item: Item(model = model["items"][item]) for item in model["items"] } self.filename = "tehsession.json" def to_json(self): model = { "sessions": {session: self.sessions[session].to_json() for session in self.sessions }, "clients": {client: self.clients[client].to_json() for client in self.clients }, "items": {item: self.items[item].to_json() for item in self.items }, } return model @ApiMethod async def test_api(self, clientid): print(f'test_api {clientid=}') @ApiMethod async def test_yeet(self, clientid): raise Exception('yeet') @ApiMethod async def change_username(self, clientid, username) -> str: self.clients[clientid].name = username @ApiMethod async def create_session(self, clientid, sessionname) -> str: if not sessionname: raise Exception(f"Sessionname cant be empty!") session = Session(name = sessionname, owner = clientid) self.sessions[session.id] = session print("create_session was called") @ApiMethod async def change_sessionname(self, clientid, sessionid, sessionname) -> str: if not sessionname: raise Exception("Cant be empty!") session = self.sessions[sessionid] if not (session.owner == clientid): raise Exception("Ownly owner can change sessionname") session.name = sessionname @ApiMethod async def create_item(self, clientid, name, description, image): client = self.clients[clientid] if not client.session: raise Exception("create_item requires the client to be host. But client is in no session") session = self.sessions[client.session] if (session.owner != client.id): raise Exception("create_item requires the client to be host. But client is not the owner") if not name: raise Exception("create_item requires an item name") item = Item(name = name, description = description, image = image) self.items[item.id] = item @ApiMethod async def leave_session(self, clientid): client = self.clients[clientid] sessionid = client.session if sessionid: session = self.sessions[sessionid] session.clients.remove(clientid) self.clients[clientid].session = "" @ApiMethod async def join_session(self, clientid, sessionid): client = self.clients[clientid] old_sessionid = client.session # leave old session if old_sessionid: old_session = self.sessions[old_sessionid] old_session.clients.remove(clientid) session = self.sessions[sessionid] client.session = sessionid session.clients.append(client.id) @ApiMethod async def draw(self, clientid): await self.send_state(clientid) async def send_lobby_view(self, clientid): data = {} client = self.clients[clientid] data["view"] = "lobby" data["username"] = client.name data["sessions"] = { session.id: { "id": session.id, "name": session.name, "owned": session.owner == client.id, } for session in self.sessions.values() } for socket in self.sockets[clientid]: await socket.send_json(data) async def send_master_view(self, clientid): data = {} client = self.clients[clientid] session = self.sessions[client.session] data["view"] = "master" data["username"] = client.name data["session"] = session.to_json() data["items"] = self.to_json()["items"] for socket in self.sockets[clientid]: await socket.send_json(data) async def send_session_view(self, clientid): data = {} client = self.clients[clientid] session = self.sessions[client.session] data["view"] = "session" data["username"] = client.name data["session"] = session.to_json() for socket in self.sockets[clientid]: await socket.send_json(data) async def send_state(self, clientid): # TODO: compute state, send to client data = {} client = self.clients[clientid] session = self.sessions[client.session] if client.session else None if session: if session.owner == client.id: await self.send_master_view(clientid) else: await self.send_session_view(clientid) else: await self.send_lobby_view(clientid) def save(self): with open(self.filename, "w") as f: json.dump(self.to_json(), f) def exists_client(self, clientid: str) -> bool: return clientid in self.clients def create_client(self, name="Joe") -> str: if not name: raise Exception("Username cannot be empty!") client = Client() client.name = name self.clients[client.id] = client return client.id async def subscribe(self, clientid, socket): if not clientid in self.sockets: self.sockets[clientid] = [] self.sockets[clientid].append(socket) await self.send_state(clientid) def unsubscribe(self, socket): for client in self.sockets: if socket in self.sockets[client]: self.sockets[client].remove(socket) class Session: def __init__(self, model = None, owner = None, name = None): if model: self.id = model["id"] self.name = model["name"] self.clients = model["clients"] self.owner = model["owner"] elif owner and name: self.id = generate_random_id() self.clients = [] self.owner = owner self.name = name else: raise Exception("Illegal session constructor") def to_json(self): model = { "id": self.id, "name": self.name, "clients": self.clients, "owner": self.owner, } return model class Client: def __init__(self, model = None): if model: self.id = model["id"] self.name = model["name"] self.session = model["session"] else: self.id = generate_random_id() self.name = "Default Client Name" self.session = "" def to_json(self): model = { "id": self.id, "name": self.name, "session": self.session, } return model class Item: def __init__(self, model = None, name = None, description = "", image = ""): if model: self.id = model["id"] self.name = model["name"] self.description = model["description"] self.image = model["image"] elif name: self.id = generate_random_id() self.name = name self.description = description self.image = image else: raise Exception("Illegal Item Constructor") def to_json(self): model = { "id": self.id, "name": self.name, "description": self.description, "image": self.image, } return model