webpnp/model.py
Dominic Zimmer 457d20bd44 Write model
2020-04-17 01:58:49 +02:00

95 lines
2.9 KiB
Python

import random
import base64
import json
import os
import datetime
def generate_random_id(_s=set()):
while (new_id := base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(10)))[:16].decode().lower()) in _s:
pass
_s.add(new_id)
return new_id
class Model(object):
global api_methods
api_methods = {}
def api_method():
global api_methods
def wrapper(fun):
api_methods[fun.__name__] = fun
return wrapper
def __init__(self, filename = "tehmodel.json"):
self.sockets = {} # mapping: client -> socket
self.filename = filename
self.model = None
if os.path.isfile(filename):
with open(filename) as f:
try:
self.model = json.load(f)
except:
self.model = {}
else:
self.model = {}
self.assert_model()
def assert_model(self):
if not "clients" in self.model:
self.model["clients"] = {}
if not "sessions" in self.model:
self.model["sessions"] = {}
@api_method()
async def create_session(self, clientid) -> str:
sessionname = generate_random_id()
newsession = {"id": sessionname, "owner": clientid, "clients": []}
self.model["sessions"][sessionname] = newsession
@api_method()
async def join_session(self, clientid, sessionid):
if sessionid in self.model["sessions"]:
self.model["sessions"][sessionid].append(clientid)
else:
raise Exception(f"Session {sessionid} does not exist")
async def send_state(self, clientid):
# TODO: compute state, send to client
data = {"message": "nudes"}
for socket in self.sockets[clientid]:
await socket.send_json(data)
def save(self):
with open(self.filename, "w") as f:
json.dump(self.model, f)
if not os.path.isdir("backups"):
try:
os.mkdir("backups")
except FileExistsError:
print("backups is a file, no directory. Please delete yourself")
datestring = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H%M%S")
with open(f"backups/{datestring}_{self.filename}", "w") as f:
json.dump(self.model, f)
def exists_client(self, clientid: str) -> bool:
return clientid in self.model["clients"]
def create_client(self) -> str:
clientname = generate_random_id()
newclient = {"id": clientname}
self.model["clients"][clientname] = newclient
return clientname
def subscribe(self, clientid, socket):
if not clientid in self.sockets:
self.sockets[clientid] = []
self.sockets[clientid].append(socket)
def unsubscribe(self, socket):
for client in self.sockets:
if socket in self.sockets[client]:
self.sockets[client].remove(socket)