import random import base64 import json import os import datetime def generate_random_id(_s=set()): while (new_id := base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(10)))[:16].decode().lower()) in _s: pass _s.add(new_id) return new_id class Model(object): class ApiMethod: dict = dict() def __init__(self, fun): self.dict[fun.__name__] = fun def __contains__(self, item): return item in self.dict def __init__(self, filename="tehmodel.json"): self.sockets = {} # mapping: client -> socket self.filename = filename self.model = None if os.path.isfile(filename): with open(filename) as f: try: self.model = json.load(f) except: self.model = {} else: self.model = {} self.assert_model() def assert_model(self): if not "clients" in self.model: self.model["clients"] = {} if not "sessions" in self.model: self.model["sessions"] = {} for session in self.model["sessions"]: if not "name" in self.model["sessions"][session]: self.model["sessions"][session]["name"] = "Defaultname" @ApiMethod async def test_api(self, clientid): print(f'test_api {clientid=}') @ApiMethod async def test_yeet(self, clientid): raise Exception('yeet') @ApiMethod async def change_username(self, clientid, username) -> str: self.model["clients"][clientid]["username"] = username await self.send_state(clientid) @ApiMethod async def create_session(self, clientid, sessionname) -> str: if not sessionname: raise Exception("Cant be empty!") sessionid = generate_random_id() newsession = {"id": sessionid, "owner": clientid, "clients": [], "name": sessionname } self.model["sessions"][sessionid] = newsession await self.send_state(clientid) @ApiMethod async def change_sessionname(self, clientid, sessionid, sessionname) -> str: if not sessionname: raise Exception("Cant be empty!") if self.model["sessions"][sessionid]["owner"] == clientid: self.model["sessions"][sessionid]["name"] = sessionname await self.send_state(clientid) @ApiMethod async def leave_session(self, clientid): sessionid = self.model["clients"][clientid]["session"] del self.model["clients"][clientid]["session"] self.model["sessions"][sessionid]["clients"].remove(clientid) await self.send_state(clientid) @ApiMethod async def join_session(self, clientid, sessionid): if sessionid in self.model["sessions"]: # session exists self.model["clients"][clientid]["session"] = sessionid if not clientid in self.model["sessions"][sessionid]["clients"]: self.model["sessions"][sessionid]["clients"].append(clientid) await self.send_state(clientid) async def send_state(self, clientid): # TODO: compute state, send to client data = {} if "session" in self.model["clients"][clientid]: sessionid = self.model["clients"][clientid]["session"] data["session"] = {"id": sessionid, "name": self.model["sessions"][sessionid]["name"] } allsessions = { name: { "id": name, "name": self.model["sessions"][name]["name"] } for name in self.model["sessions"] } for session in allsessions: if self.model["sessions"][session]["owner"] == clientid: allsessions[session]["owner"] = True data["allsessions"] = allsessions data["username"] = self.model["clients"][clientid]["username"] if "username" in self.model["clients"][clientid] else "Joe" for socket in self.sockets[clientid]: await socket.send_json(data) def save(self): with open(self.filename, "w") as f: json.dump(self.model, f) if not os.path.isdir("backups"): try: os.mkdir("backups") except FileExistsError: print("backups is a file, no directory. Please delete yourself") datestring = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H%M%S") with open(f"backups/{datestring}_{self.filename}", "w") as f: json.dump(self.model, f) def exists_client(self, clientid: str) -> bool: return clientid in self.model["clients"] def create_client(self, name="Joe") -> str: if not name: raise Exception("Username cannot be empty!") clientname = generate_random_id() newclient = {"id": clientname, "username": name, "sessions": []} self.model["clients"][clientname] = newclient return clientname async def subscribe(self, clientid, socket): if not clientid in self.sockets: self.sockets[clientid] = [] self.sockets[clientid].append(socket) await self.send_state(clientid) def unsubscribe(self, socket): for client in self.sockets: if socket in self.sockets[client]: self.sockets[client].remove(socket)