webpnp/model.py
Dominic Zimmer 8fe6fa4884 USERNAMES
2020-04-17 02:48:40 +02:00

116 lines
4.0 KiB
Python

import random
import base64
import json
import os
import datetime
def generate_random_id(_s=set()):
while (new_id := base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(10)))[:16].decode().lower()) in _s:
pass
_s.add(new_id)
return new_id
class Model(object):
class ApiMethod:
dict = dict()
def __init__(self, fun):
self.dict[fun.__name__] = fun
def __contains__(self, item):
return item in self.dict
def __init__(self, filename="tehmodel.json"):
self.sockets = {} # mapping: client -> socket
self.filename = filename
self.model = None
if os.path.isfile(filename):
with open(filename) as f:
try:
self.model = json.load(f)
except:
self.model = {}
else:
self.model = {}
self.assert_model()
def assert_model(self):
if not "clients" in self.model:
self.model["clients"] = {}
if not "sessions" in self.model:
self.model["sessions"] = {}
@ApiMethod
async def test_api(self, clientid):
print(f'test_api {clientid=}')
@ApiMethod
async def test_yeet(self, clientid):
raise Exception('yeet')
@ApiMethod
async def create_session(self, clientid) -> str:
sessionname = generate_random_id()
newsession = {"id": sessionname, "owner": clientid, "clients": []}
self.model["sessions"][sessionname] = newsession
await self.send_state(clientid)
@ApiMethod
async def join_session(self, clientid, sessionid):
if sessionid in self.model["sessions"]:
# remove old session
if "session" in self.model["clients"][clientid]:
oldsession = self.model["clients"][clientid]["session"]
self.model["sessions"][oldsession]["clients"].remove(clientid)
self.model["sessions"][sessionid]["clients"].append(clientid)
self.model["clients"][clientid]["session"] = sessionid
await self.send_state(clientid)
else:
raise Exception(f"Session {sessionid} does not exist")
async def send_state(self, clientid):
# TODO: compute state, send to client
session = "No session"
if "session" in self.model["clients"][clientid]:
session = self.model["clients"][clientid]["session"]
allsessions = [ name for name in self.model["sessions"] ]
username = self.model["clients"][clientid]["name"] if "name" in self.model["clients"][clientid] else "Joe"
data = {"currentsession": session, "allsessions": allsessions, "username": username}
for socket in self.sockets[clientid]:
await socket.send_json(data)
def save(self):
with open(self.filename, "w") as f:
json.dump(self.model, f)
if not os.path.isdir("backups"):
try:
os.mkdir("backups")
except FileExistsError:
print("backups is a file, no directory. Please delete yourself")
datestring = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H%M%S")
with open(f"backups/{datestring}_{self.filename}", "w") as f:
json.dump(self.model, f)
def exists_client(self, clientid: str) -> bool:
return clientid in self.model["clients"]
def create_client(self, name="Joe") -> str:
clientname = generate_random_id()
newclient = {"id": clientname, "name": name}
self.model["clients"][clientname] = newclient
return clientname
async def subscribe(self, clientid, socket):
if not clientid in self.sockets:
self.sockets[clientid] = []
self.sockets[clientid].append(socket)
await self.send_state(clientid)
def unsubscribe(self, socket):
for client in self.sockets:
if socket in self.sockets[client]:
self.sockets[client].remove(socket)