168 lines
4.9 KiB
Python
168 lines
4.9 KiB
Python
import random
|
|
import base64
|
|
import json
|
|
import os
|
|
import datetime
|
|
import hashlib
|
|
|
|
def generate_random_id(_s=set()):
|
|
new_id = base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(32)))[:32].decode().lower()
|
|
while (new_id in _s):
|
|
new_id = base64.b32encode(bytearray(random.randint(0, 0xFF) for _ in range(32)))[:32].decode().lower()
|
|
_s.add(new_id)
|
|
return new_id
|
|
|
|
if not "WASCHMARKENSECRET" in os.environ:
|
|
print("Please set the environment variable WASCHMARKENSECRET first")
|
|
exit()
|
|
SECRET = os.environ["WASCHMARKENSECRET"]
|
|
|
|
class Model(object):
|
|
class ApiMethod:
|
|
dict = dict()
|
|
|
|
def __init__(self, fun):
|
|
self.dict[fun.__name__] = fun
|
|
|
|
def __contains__(self, item):
|
|
return item in self.dict
|
|
|
|
def __init__(self, model = dict()):
|
|
self.filename = "scoreboard.json"
|
|
if "users" not in model:
|
|
model["users"] = {}
|
|
#print(f"loaded model: {model}")
|
|
self.users = { uuid : User(model=model["users"][uuid]) for uuid in model["users"]}
|
|
|
|
#for _ in range(5):
|
|
# newuser = User()
|
|
# self.users[newuser.uuid] = newuser
|
|
|
|
self.update_secretdb()
|
|
|
|
print(f'Admin Token: {hashlib.sha256(SECRET.encode() + b"admintoken").hexdigest()}')
|
|
#for user in self.users.values():
|
|
# print(f"{user.name.rjust(25)} -> /dealer/{user.get_secret()}")
|
|
|
|
def update_secretdb(self):
|
|
self.secretlookup = { self.users[uuid].get_secret() : uuid for uuid in self.users }
|
|
|
|
def to_json(self):
|
|
model = {
|
|
"users": { uuid: self.users[uuid].to_json() for uuid in self.users },
|
|
}
|
|
#print(model)
|
|
return model
|
|
|
|
def verify_user(self, authtoken):
|
|
if authtoken in self.secretlookup:
|
|
return self.users[self.secretlookup[authtoken]]
|
|
raise Exception(f"Unauthorized user: {authtoken}")
|
|
|
|
def verify_admin(self, authtoken):
|
|
if authtoken == hashlib.sha256(SECRET.encode() + b"admintoken").hexdigest():
|
|
return
|
|
raise Exception(f"Unauthorized admin: {authtoken}")
|
|
|
|
def save(self):
|
|
with open(self.filename, "w") as f:
|
|
json.dump(self.to_json(), f, indent=2)
|
|
|
|
#
|
|
# Public API Methods
|
|
#
|
|
|
|
@ApiMethod
|
|
async def get_public_model(self):
|
|
return self.to_json()
|
|
|
|
#
|
|
# Authorized API Methods
|
|
#
|
|
|
|
@ApiMethod
|
|
async def get_user(self, authtoken):
|
|
user = self.verify_user(authtoken)
|
|
return user.to_json()
|
|
|
|
@ApiMethod
|
|
async def set_score(self, authtoken, newscore):
|
|
user = self.verify_user(authtoken)
|
|
if newscore <= user.maxscore:
|
|
if newscore < user.score:
|
|
raise Exception("Tried to lower its score")
|
|
else:
|
|
user.score = newscore
|
|
else:
|
|
raise Exception("Tried to raise user score above maxscore")
|
|
|
|
#
|
|
# Admin API Methods
|
|
#
|
|
|
|
@ApiMethod
|
|
async def get_secret(self, authtoken, uuid):
|
|
self.verify_admin(authtoken)
|
|
if uuid not in self.users:
|
|
raise Exception("Incorrect UUID")
|
|
user = self.users[uuid]
|
|
return {uuid: self.users[uuid].get_secret()}
|
|
|
|
@ApiMethod
|
|
async def add_user(self, authtoken, username):
|
|
self.verify_admin(authtoken)
|
|
if username == "":
|
|
raise Exception("Username can't be blank!")
|
|
newuser = User(username = username)
|
|
self.users[newuser.uuid] = newuser
|
|
self.update_secretdb()
|
|
|
|
@ApiMethod
|
|
async def rename_user(self, authtoken, uuid, newusername):
|
|
self.verify_admin(authtoken)
|
|
if uuid in self.users:
|
|
self.users[uuid].name = newusername
|
|
else:
|
|
raise Exception("No such user")
|
|
|
|
@ApiMethod
|
|
async def set_max_score(self, authtoken, uuid, newmaxscore):
|
|
self.verify_admin(authtoken)
|
|
if uuid not in self.users:
|
|
raise Exception("No such user")
|
|
user = self.users[uuid]
|
|
if newmaxscore < user.maxscore:
|
|
raise Exception("Tried to lower maxscore. That's bad...")
|
|
else:
|
|
user.score = user.maxscore
|
|
user.maxscore = newmaxscore
|
|
|
|
class User:
|
|
|
|
def __init__(self, username = "Default Username", model = None):
|
|
if model:
|
|
self.uuid = model["uuid"]
|
|
self.name = model["name"]
|
|
self.score = model["score"]
|
|
self.maxscore = model["maxscore"]
|
|
self.timeout = model["timeout"]
|
|
else:
|
|
self.uuid = generate_random_id()
|
|
self.name = username
|
|
self.score = 0
|
|
self.maxscore = 0
|
|
self.timeout = 0
|
|
|
|
def get_secret(self):
|
|
return hashlib.sha256(SECRET.encode() + str(self.uuid).encode()).hexdigest()
|
|
|
|
def to_json(self):
|
|
model = {
|
|
"uuid": self.uuid,
|
|
"name": self.name,
|
|
"score": self.score,
|
|
"maxscore": self.maxscore,
|
|
"timeout": self.timeout,
|
|
}
|
|
return model
|