import random import base64 import json import os import datetime import hashlib def generate_random_id(_s=set()): new_id = base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(32)))[:32].decode().lower() while (new_id in _s): new_id = base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(32)))[:32].decode().lower() _s.add(new_id) return new_id if not "WASCHMARKENSECRET" in os.environ: print("Please set the environment variable WASCHMARKENSECRET first") exit() SECRET = os.environ["WASCHMARKENSECRET"] class Model(object): class ApiMethod: dict = dict() def __init__(self, fun): self.dict[fun.__name__] = fun def __contains__(self, item): return item in self.dict def __init__(self, model = dict()): self.filename = "scoreboard.json" if "users" not in model: model["users"] = {} #print(f"loaded model: {model}") self.users = { uuid : User(model=model["users"][uuid]) for uuid in model["users"]} #for _ in range(5): # newuser = User() # self.users[newuser.uuid] = newuser self.update_secretdb() print(f'Admin Token: {hashlib.sha256(SECRET.encode() + b"admintoken").hexdigest()}') #for user in self.users.values(): # print(f"{user.name.rjust(25)} -> /dealer/{user.get_secret()}") def update_secretdb(self): self.secretlookup = { self.users[uuid].get_secret() : uuid for uuid in self.users } def to_json(self): model = { "users": { uuid: self.users[uuid].to_json() for uuid in self.users }, } #print(model) return model def verify_user(self, authtoken): if authtoken in self.secretlookup: return self.users[self.secretlookup[authtoken]] raise Exception(f"Unauthorized user: {authtoken}") def verify_admin(self, authtoken): if authtoken == hashlib.sha256(SECRET.encode() + b"admintoken").hexdigest(): return raise Exception(f"Unauthorized admin: {authtoken}") def save(self): with open(self.filename, "w") as f: json.dump(self.to_json(), f, indent=2) # # Public API Methods # @ApiMethod async def get_public_model(self): return self.to_json() # # Authorized API Methods # @ApiMethod async def get_user(self, authtoken): user = self.verify_user(authtoken) return user.to_json() @ApiMethod async def set_score(self, authtoken, newscore): user = self.verify_user(authtoken) if newscore <= user.maxscore: if newscore < user.score: raise Exception("Tried to lower its score") else: user.score = newscore else: raise Exception("Tried to raise user score above maxscore") # # Admin API Methods # @ApiMethod async def get_secret(self, authtoken, uuid): self.verify_admin(authtoken) if uuid not in self.users: raise Exception("Incorrect UUID") user = self.users[uuid] return {uuid: self.users[uuid].get_secret()} @ApiMethod async def add_user(self, authtoken, username): self.verify_admin(authtoken) if username == "": raise Exception("Username can't be blank!") newuser = User(username = username) self.users[newuser.uuid] = newuser self.update_secretdb() @ApiMethod async def rename_user(self, authtoken, uuid, newusername): self.verify_admin(authtoken) if uuid in self.users: self.users[uuid].name = newusername else: raise Exception("No such user") @ApiMethod async def set_max_score(self, authtoken, uuid, newmaxscore): self.verify_admin(authtoken) if uuid not in self.users: raise Exception("No such user") user = self.users[uuid] if newmaxscore < user.maxscore: raise Exception("Tried to lower maxscore. That's bad...") else: user.score = user.maxscore user.maxscore = newmaxscore class User: def __init__(self, username = "Default Username", model = None): if model: self.uuid = model["uuid"] self.name = model["name"] self.score = model["score"] self.maxscore = model["maxscore"] self.timeout = model["timeout"] else: self.uuid = generate_random_id() self.name = username self.score = 0 self.maxscore = 0 self.timeout = 0 def get_secret(self): return hashlib.sha256(SECRET.encode() + str(self.uuid).encode()).hexdigest() def to_json(self): model = { "uuid": self.uuid, "name": self.name, "score": self.score, "maxscore": self.maxscore, "timeout": self.timeout, } return model