waschmarken/model.py
2020-07-18 01:03:44 +02:00

152 lines
4.4 KiB
Python

import random
import base64
import json
import os
import datetime
import hashlib
def generate_random_id(_s=set()):
new_id = base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(32)))[:32].decode().lower()
while (new_id in _s):
new_id = base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(32)))[:32].decode().lower()
_s.add(new_id)
return new_id
if not "WASCHMARKENSECRET" in os.environ:
print("Please set the environment variable WASCHMARKENSECRET first")
exit()
SECRET = os.environ["WASCHMARKENSECRET"]
class Model(object):
class ApiMethod:
dict = dict()
def __init__(self, fun):
self.dict[fun.__name__] = fun
def __contains__(self, item):
return item in self.dict
def __init__(self, model = dict()):
self.filename = "scoreboard.json"
if "users" not in model:
model["users"] = {}
print(f"loaded model: {model}")
self.users = { uuid : User(model=model["users"][uuid]) for uuid in model["users"]}
#for _ in range(5):
# newuser = User()
# self.users[newuser.uuid] = newuser
self.secretlookup = { self.users[uuid].get_secret() : uuid for uuid in self.users }
for user in self.users.values():
print(f"{user.name.rjust(25)} -> /dealer/{user.get_secret()}")
def to_json(self):
model = {
"users": { uuid: self.users[uuid].to_json() for uuid in self.users },
}
print(model)
return model
def verify_user(self, authtoken):
if authtoken in self.secretlookup:
return self.users[self.secretlookup[authtoken]]
raise Exception(f"Unauthorized user: {authtoken}")
def verify_admin(self, authtoken):
if authtoken == hashlib.sha256(SECRET.encode() + b"admintoken").hexdigest():
return
raise Exception(f"Unauthorized admin: {authtoken}")
def save(self):
with open(self.filename, "w") as f:
json.dump(self.to_json(), f, indent=2)
#
# Public API Methods
#
@ApiMethod
async def get_public_model(self):
return self.to_json()
#
# Authorized API Methods
#
@ApiMethod
async def get_user(self, authtoken):
user = self.verify_user(authtoken)
return user.to_json()
@ApiMethod
async def set_score(self, authtoken, newscore):
user = self.verify_user(authtoken)
if newscore <= user.maxscore:
if newscore < user.score:
raise Exception("Tried to lower its score")
else:
user.score = newscore
else:
raise Exception("Tried to raise user score above maxscore")
#
# Admin API Methods
#
@ApiMethod
async def add_user(self, authtoken, username):
self.verify_admin(authtoken)
newuser = User(username = username)
self.users[newuser.uuid] = newuser
@ApiMethod
async def rename_user(self, authtoken, uuid, newusername):
self.verify_admin(authtoken)
if uuid in self.users:
self.users[uuid].name = newusername
else:
raise Exception("No such user")
@ApiMethod
async def set_max_score(self, authtoken, uuid, newmaxscore):
self.verify_admin(authtoken)
if uuid not in self.users:
raise Exception("No such user")
user = self.users[uuid]
if newmaxscore < user.maxscore:
raise Exception("Tried to lower maxscore. That's bad...")
else:
user.maxscore = newmaxscore
class User:
def __init__(self, username = "Default Username", model = None):
if model:
self.uuid = model["uuid"]
self.name = model["name"]
self.score = model["score"]
self.maxscore = model["maxscore"]
self.timeout = model["timeout"]
else:
self.uuid = generate_random_id()
self.name = username
self.score = 0
self.maxscore = 0
self.timeout = 0
def get_secret(self):
return hashlib.sha256(SECRET.encode() + str(self.uuid).encode()).hexdigest()
def to_json(self):
model = {
"uuid": self.uuid,
"name": self.name,
"score": self.score,
"maxscore": self.maxscore,
"timeout": self.timeout,
}
return model