webpnp/model.py
2020-04-19 16:01:00 +02:00

200 lines
6.0 KiB
Python

import random
import base64
import json
import os
import datetime
def generate_random_id(_s=set()):
while (new_id := base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(10)))[:16].decode().lower()) in _s:
pass
_s.add(new_id)
return new_id
class Model(object):
class ApiMethod:
dict = dict()
def __init__(self, fun):
self.dict[fun.__name__] = fun
def __contains__(self, item):
return item in self.dict
def __init__(self, model = {}):
if "sessions" not in model:
model["sessions"] = []
if "clients" not in model:
model["clients"] = []
self.sockets = {} # mapping: clientid -> sockets
self.sessions = { session: Session(model = model["sessions"][session]) for session in model["sessions"] }
self.clients = { client: Client(model = model["clients"][client]) for client in model["clients"] }
self.filename = "tehsession.json"
def to_json(self):
model = {
"sessions": {session: self.sessions[session].to_json() for session in self.sessions },
"clients": {client: self.clients[client].to_json() for client in self.clients },
}
return model
@ApiMethod
async def test_api(self, clientid):
print(f'test_api {clientid=}')
@ApiMethod
async def test_yeet(self, clientid):
raise Exception('yeet')
@ApiMethod
async def change_username(self, clientid, username) -> str:
self.clients[clientid].name = username
@ApiMethod
async def create_session(self, clientid, sessionname) -> str:
if not sessionname:
raise Exception(f"Sessionname cant be empty!")
session = Session(name = sessionname, owner = clientid)
self.sessions[session.id] = session
print("create_session was called")
@ApiMethod
async def change_sessionname(self, clientid, sessionid, sessionname) -> str:
if not sessionname:
raise Exception("Cant be empty!")
session = self.sessions[sessionid]
if not (session.owner == clientid):
raise Exception("Ownly owner can change sessionname")
session.name = sessionname
@ApiMethod
async def leave_session(self, clientid):
client = self.clients[clientid]
sessionid = client.session
if sessionid:
session = self.sessions[sessionid]
session.clients.remove(clientid)
self.clients[clientid].session = ""
@ApiMethod
async def join_session(self, clientid, sessionid):
client = self.clients[clientid]
old_sessionid = client.session
# leave old session
if old_sessionid:
old_session = self.sessions[old_sessionid]
old_session.clients.remove(clientid)
session = self.sessions[sessionid]
client.session = sessionid
session.clients.append(client.id)
async def send_lobby_view(self, clientid):
data = {}
client = self.clients[clientid]
data["view"] = "lobby"
data["username"] = client.name
data["sessions"] = {
session.id: {
"id": session.id,
"name": session.name,
"owned": session.owner == client.id,
} for session in self.sessions.values()
}
for socket in self.sockets[clientid]:
await socket.send_json(data)
async def send_session_view(self, clientid):
data = {}
client = self.clients[clientid]
data["view"] = "session"
data["username"] = client.name
for socket in self.sockets[clientid]:
await socket.send_json(data)
async def send_state(self, clientid):
# TODO: compute state, send to client
data = {}
client = self.clients[clientid]
session = self.sessions[client.session] if client.session else None
if session:
await self.send_session_view(clientid)
else:
await self.send_lobby_view(clientid)
def save(self):
with open(self.filename, "w") as f:
json.dump(self.to_json(), f)
def exists_client(self, clientid: str) -> bool:
return clientid in self.clients
def create_client(self, name="Joe") -> str:
if not name:
raise Exception("Username cannot be empty!")
client = Client()
client.name = name
self.clients[client.id] = client
return client.id
async def subscribe(self, clientid, socket):
if not clientid in self.sockets:
self.sockets[clientid] = []
self.sockets[clientid].append(socket)
await self.send_state(clientid)
def unsubscribe(self, socket):
for client in self.sockets:
if socket in self.sockets[client]:
self.sockets[client].remove(socket)
class Session:
def __init__(self, model = None, owner = None, name = None):
if model:
self.id = model["id"]
self.name = model["name"]
self.clients = model["clients"]
self.owner = model["owner"]
elif owner and name:
self.id = generate_random_id()
self.clients = []
self.owner = owner
self.name = name
else:
raise Exception("Illegal session constructor")
def to_json(self):
model = {
"id": self.id,
"name": self.name,
"clients": self.clients,
"owner": self.owner,
}
return model
class Client:
def __init__(self, model = None):
if model:
self.id = model["id"]
self.name = model["name"]
self.session = model["session"]
else:
self.id = generate_random_id()
self.name = "Default Client Name"
self.session = ""
def to_json(self):
model = {
"id": self.id,
"name": self.name,
"session": self.session,
}
return model