349 lines
12 KiB
Python
349 lines
12 KiB
Python
import random
|
|
import base64
|
|
import json
|
|
import os
|
|
import datetime
|
|
|
|
|
|
def generate_random_id(_s=set()):
|
|
while (new_id := base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(10)))[:16].decode().lower()) in _s:
|
|
pass
|
|
_s.add(new_id)
|
|
return new_id
|
|
|
|
|
|
class Model(object):
|
|
class ApiMethod:
|
|
dict = dict()
|
|
|
|
def __init__(self, fun):
|
|
self.dict[fun.__name__] = fun
|
|
|
|
def __contains__(self, item):
|
|
return item in self.dict
|
|
|
|
def __init__(self, model = {}):
|
|
if "sessions" not in model:
|
|
model["sessions"] = []
|
|
if "clients" not in model:
|
|
model["clients"] = []
|
|
if "items" not in model:
|
|
model["items"] = []
|
|
self.sockets = {} # mapping: clientid -> sockets
|
|
self.sessions = { session: Session(model = model["sessions"][session]) for session in model["sessions"] }
|
|
self.clients = { client: Client(model = model["clients"][client]) for client in model["clients"] }
|
|
self.items = { item: Item(model = model["items"][item]) for item in model["items"] }
|
|
self.filename = "tehsession.json"
|
|
|
|
def to_json(self):
|
|
model = {
|
|
"sessions": {session: self.sessions[session].to_json() for session in self.sessions },
|
|
"clients": {client: self.clients[client].to_json() for client in self.clients },
|
|
"items": {item: self.items[item].to_json() for item in self.items },
|
|
}
|
|
return model
|
|
|
|
@ApiMethod
|
|
async def test_api(self, clientid):
|
|
print(f'test_api {clientid=}')
|
|
|
|
@ApiMethod
|
|
async def test_yeet(self, clientid):
|
|
raise Exception('yeet')
|
|
|
|
@ApiMethod
|
|
async def change_username(self, clientid, username) -> str:
|
|
self.clients[clientid].name = username
|
|
|
|
@ApiMethod
|
|
async def create_session(self, clientid, sessionname) -> str:
|
|
if not sessionname:
|
|
raise Exception(f"Sessionname cant be empty!")
|
|
session = Session(name = sessionname, owner = clientid)
|
|
self.sessions[session.id] = session
|
|
print("create_session was called")
|
|
|
|
@ApiMethod
|
|
async def change_sessionname(self, clientid, sessionid, sessionname) -> str:
|
|
if not sessionname:
|
|
raise Exception("Cant be empty!")
|
|
session = self.sessions[sessionid]
|
|
if not (session.owner == clientid):
|
|
raise Exception("Ownly owner can change sessionname")
|
|
session.name = sessionname
|
|
|
|
@ApiMethod
|
|
async def create_item(self, clientid, name, description, image):
|
|
client = self.clients[clientid]
|
|
if not client.session:
|
|
raise Exception("create_item requires the client to be host. But client is in no session")
|
|
session = self.sessions[client.session]
|
|
if (session.owner != client.id):
|
|
raise Exception("create_item requires the client to be host. But client is not the owner")
|
|
if not name:
|
|
raise Exception("create_item requires an item name")
|
|
item = Item(name = name, description = description, image = image)
|
|
self.items[item.id] = item
|
|
|
|
|
|
@ApiMethod
|
|
async def leave_session(self, clientid):
|
|
client = self.clients[clientid]
|
|
sessionid = client.session
|
|
if sessionid:
|
|
session = self.sessions[sessionid]
|
|
session.clients.remove(clientid)
|
|
self.clients[clientid].session = ""
|
|
|
|
@ApiMethod
|
|
async def join_session(self, clientid, sessionid):
|
|
client = self.clients[clientid]
|
|
old_sessionid = client.session
|
|
# leave old session
|
|
if old_sessionid:
|
|
old_session = self.sessions[old_sessionid]
|
|
old_session.clients.remove(clientid)
|
|
session = self.sessions[sessionid]
|
|
client.session = sessionid
|
|
session.clients.append(client.id)
|
|
|
|
|
|
@ApiMethod
|
|
async def move_item(self, clientid, fromplayer, toplayer, itemid, toslot, fromslot):
|
|
client = self.clients[clientid]
|
|
fromclient = next((client for client in self.clients.values() if client.name == fromplayer), None)
|
|
toclient = next((client for client in self.clients.values() if client.name == toplayer), None)
|
|
if fromplayer == "master":
|
|
# create item
|
|
if not toclient:
|
|
raise Exception("to-client is illegal")
|
|
session = self.sessions[client.session]
|
|
if not session:
|
|
raise Exception("move item must be used in session")
|
|
if client.id != session.owner:
|
|
raise Exception("Only owner can move items")
|
|
toslot -= 1
|
|
if toslot > len(session.inventories[toclient.id]):
|
|
raise Exception("Index out of toplayers range")
|
|
session.inventories[toclient.id].insert(toslot, itemid)
|
|
elif toplayer == "master":
|
|
if not fromclient:
|
|
raise Exception("from-client is illegal")
|
|
session = self.sessions[client.session]
|
|
if not session:
|
|
raise Exception("move item must be used in session")
|
|
if client.id != session.owner:
|
|
raise Exception("Only owner can move items")
|
|
if itemid not in session.inventories[fromclient.id]:
|
|
raise Exception("he does not have that item")
|
|
fromslot -= 1
|
|
if fromslot > len(session.inventories[fromclient.id]):
|
|
raise Exception("Index out of fromplayer range")
|
|
session.inventories[fromclient.id].pop(fromslot)
|
|
else:
|
|
if not fromclient and not toclient:
|
|
raise Exception("from- or to-client are illegal")
|
|
session = self.sessions[client.session]
|
|
if not session:
|
|
raise Exception("move item must be used in session")
|
|
if client.id != session.owner:
|
|
raise Exception("Only owner can move items")
|
|
if itemid not in session.inventories[fromclient.id]:
|
|
raise Exception("he does not have that item")
|
|
toslot -= 1
|
|
if toslot > len(session.inventories[toclient.id]):
|
|
raise Exception("Index out of toplayers range")
|
|
fromslot -= 1
|
|
if fromslot > len(session.inventories[fromclient.id]):
|
|
raise Exception("Index out of fromplayer range")
|
|
session.inventories[fromclient.id].pop(fromslot)
|
|
session.inventories[toclient.id].insert(toslot, itemid)
|
|
|
|
|
|
|
|
@ApiMethod
|
|
async def draw(self, clientid):
|
|
await self.send_state(clientid)
|
|
|
|
async def send_lobby_view(self, clientid):
|
|
data = {}
|
|
client = self.clients[clientid]
|
|
|
|
data["view"] = "lobby"
|
|
data["username"] = client.name
|
|
data["sessions"] = {
|
|
session.id: {
|
|
"id": session.id,
|
|
"name": session.name,
|
|
"owned": session.owner == client.id,
|
|
} for session in self.sessions.values()
|
|
}
|
|
for socket in self.sockets[clientid]:
|
|
await socket.send_json(data)
|
|
|
|
async def send_master_view(self, clientid):
|
|
data = {}
|
|
client = self.clients[clientid]
|
|
session = self.sessions[client.session]
|
|
|
|
data["view"] = "master"
|
|
data["username"] = client.name
|
|
data["session"] = session.to_json()
|
|
data["items"] = self.to_json()["items"]
|
|
data["inventories"] = {}
|
|
for _client in session.inventories:
|
|
_client = self.clients[_client]
|
|
inventory = session.get_items(_client.id)
|
|
inventory = list(map(lambda itemid: self.to_json()["items"][itemid], inventory))
|
|
data["inventories"][_client.name] = inventory
|
|
|
|
for socket in self.sockets[clientid]:
|
|
await socket.send_json(data)
|
|
|
|
|
|
async def send_session_view(self, clientid):
|
|
data = {}
|
|
client = self.clients[clientid]
|
|
session = self.sessions[client.session]
|
|
|
|
data["view"] = "session"
|
|
data["username"] = client.name
|
|
data["session"] = session.to_json()
|
|
|
|
for socket in self.sockets[clientid]:
|
|
await socket.send_json(data)
|
|
|
|
async def send_state(self, clientid):
|
|
# TODO: compute state, send to client
|
|
data = {}
|
|
client = self.clients[clientid]
|
|
session = self.sessions[client.session] if client.session else None
|
|
if session:
|
|
if session.owner == client.id:
|
|
await self.send_master_view(clientid)
|
|
else:
|
|
await self.send_session_view(clientid)
|
|
else:
|
|
await self.send_lobby_view(clientid)
|
|
|
|
def save(self):
|
|
with open(self.filename, "w") as f:
|
|
json.dump(self.to_json(), f)
|
|
|
|
def exists_client(self, clientid: str) -> bool:
|
|
return clientid in self.clients
|
|
|
|
def create_client(self, name="Joe") -> str:
|
|
if not name:
|
|
raise Exception("Username cannot be empty!")
|
|
client = Client()
|
|
client.name = name
|
|
self.clients[client.id] = client
|
|
return client.id
|
|
|
|
async def subscribe(self, clientid, socket):
|
|
if not clientid in self.sockets:
|
|
self.sockets[clientid] = []
|
|
self.sockets[clientid].append(socket)
|
|
await self.send_state(clientid)
|
|
|
|
def unsubscribe(self, socket):
|
|
for client in self.sockets:
|
|
if socket in self.sockets[client]:
|
|
self.sockets[client].remove(socket)
|
|
|
|
class Session:
|
|
|
|
def __init__(self, model = None, owner = None, name = None):
|
|
if model:
|
|
self.id = model["id"]
|
|
self.name = model["name"]
|
|
self.clients = model["clients"]
|
|
self.owner = model["owner"]
|
|
if "inventories" in model:
|
|
self.inventories = model["inventories"]
|
|
else:
|
|
self.inventories = {}
|
|
elif owner and name:
|
|
self.id = generate_random_id()
|
|
self.clients = []
|
|
self.owner = owner
|
|
self.name = name
|
|
self.inventories = {}
|
|
else:
|
|
raise Exception("Illegal session constructor")
|
|
|
|
def get_items(self, playerid):
|
|
if playerid in self.inventories:
|
|
return self.inventories[playerid]
|
|
else:
|
|
return []
|
|
|
|
def give_item(self, playerid, itemid):
|
|
if not playerid in self.inventories:
|
|
self.inventories[playerid] = []
|
|
self.inventories[playerid].append(itemid)
|
|
|
|
def to_json(self):
|
|
model = {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"clients": self.clients,
|
|
"owner": self.owner,
|
|
"inventories": self.inventories
|
|
}
|
|
return model
|
|
|
|
|
|
|
|
class Client:
|
|
|
|
def __init__(self, model = None):
|
|
if model:
|
|
self.id = model["id"]
|
|
self.name = model["name"]
|
|
self.session = model["session"]
|
|
else:
|
|
self.id = generate_random_id()
|
|
self.name = "Default Client Name"
|
|
self.session = ""
|
|
|
|
def to_json(self):
|
|
model = {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"session": self.session,
|
|
}
|
|
return model
|
|
|
|
class Item:
|
|
|
|
def __init__(self, model = None, name = None, description = "", image = "", tags = {}):
|
|
if model:
|
|
self.id = model["id"]
|
|
self.name = model["name"]
|
|
self.description = model["description"]
|
|
self.image = model["image"]
|
|
if "tags" in model:
|
|
self.tags = model["tags"]
|
|
else:
|
|
self.tags = {}
|
|
elif name:
|
|
self.id = generate_random_id()
|
|
self.name = name
|
|
self.description = description
|
|
self.image = image
|
|
self.tags = tags
|
|
else:
|
|
raise Exception("Illegal Item Constructor")
|
|
|
|
def to_json(self):
|
|
model = {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"description": self.description,
|
|
"image": self.image,
|
|
"tags": self.tags,
|
|
}
|
|
return model
|