207 lines
5.9 KiB
Python
207 lines
5.9 KiB
Python
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass
|
|
import random
|
|
from typing import Callable, Awaitable
|
|
|
|
from fastapi import WebSocket, WebSocketDisconnect
|
|
|
|
from .settings import settings
|
|
from .input import EMPTY_INPUT, Button, Input
|
|
from .pag import input as pag_input
|
|
|
|
InputGetter = Callable[[None], Awaitable[list[Input]]]
|
|
OutputSetter = Callable[[Input], None]
|
|
ModeFunction = Callable[[InputGetter, OutputSetter], None]
|
|
|
|
|
|
@dataclass
|
|
class GameMode:
|
|
name: str
|
|
mode_function: ModeFunction
|
|
allow_multitouch: bool
|
|
|
|
|
|
@dataclass
|
|
class State:
|
|
mode: str
|
|
allowMultitouch: bool | None
|
|
|
|
nextMode: str
|
|
timeUntilNextMode: float
|
|
votes: dict[Button, int]
|
|
|
|
# playerIdle: bool # set individually for each client
|
|
|
|
|
|
class ClientState:
|
|
def __init__(self):
|
|
self.current_input = EMPTY_INPUT
|
|
self.inactivity_task = None
|
|
self.is_active = False
|
|
|
|
def on_input(self, input: Input):
|
|
self.current_input = input
|
|
self.is_active = True
|
|
if self.inactivity_task:
|
|
self.inactivity_task.cancel()
|
|
self.inactivity_task = asyncio.create_task(self.deactivator())
|
|
|
|
async def deactivator(self):
|
|
await asyncio.sleep(settings.client_idle_timeout)
|
|
self.is_active = False
|
|
self.inactivity_task = None
|
|
|
|
|
|
class Arbiter:
|
|
def __init__(self):
|
|
self.modes: list[GameMode] = []
|
|
self.state = State("", False, "", 0, dict())
|
|
self.current_mode = None
|
|
self.current_mode_task = None
|
|
self.modeswitch_time = None
|
|
self.clients: dict[WebSocket, ClientState] = dict()
|
|
|
|
def mode(self, name: str, allow_multitouch: bool = True):
|
|
def inner(f: ModeFunction):
|
|
self.modes.append(GameMode(name, f, allow_multitouch))
|
|
|
|
return inner
|
|
|
|
async def handle_socket(self, socket: WebSocket):
|
|
self.clients[socket] = ClientState()
|
|
while True:
|
|
try:
|
|
data = await socket.receive_json()
|
|
self.clients[socket].on_input(data)
|
|
except WebSocketDisconnect:
|
|
break
|
|
del self.clients[socket]
|
|
|
|
async def get_input(self):
|
|
|
|
# filter players
|
|
allowed_multitouch = 1 if self.current_mode.allow_multitouch else 10
|
|
active_player_inputs = [
|
|
client.current_input
|
|
for client in self.clients.values()
|
|
if client.is_active
|
|
and sum(client.current_input.values()) <= allowed_multitouch
|
|
]
|
|
|
|
# update vote histogram
|
|
self.state.votes = {
|
|
button: sum(input[button] for input in active_player_inputs)
|
|
for button in Button
|
|
}
|
|
|
|
return active_player_inputs
|
|
|
|
def set_output(self, output: Input):
|
|
pag_input.set(output)
|
|
|
|
def update_next_mode(self):
|
|
choices = [mode for mode in self.modes if mode != self.current_mode]
|
|
if choices:
|
|
self.next_mode = random.choice(choices)
|
|
else:
|
|
self.next_mode = self.current_mode
|
|
self.state.nextMode = self.next_mode.name
|
|
|
|
async def main_loop(self):
|
|
# current_mode is None / ""
|
|
self.update_next_mode()
|
|
|
|
while True:
|
|
# switch modes
|
|
self.current_mode = self.next_mode
|
|
self.state.mode = self.current_mode.name
|
|
self.update_next_mode()
|
|
self.modeswitch_time = datetime.now() + timedelta(
|
|
seconds=settings.arbiter_mode_switch_cycle
|
|
)
|
|
|
|
if self.current_mode_task:
|
|
self.current_mode_task.cancel()
|
|
self.current_mode_task = asyncio.create_task(
|
|
self.current_mode.mode_function(self.get_input, self.set_output)
|
|
)
|
|
|
|
# send updates until next mode switch
|
|
while (now := datetime.now()) < self.modeswitch_time:
|
|
self.state.timeUntilNextMode = (
|
|
self.modeswitch_time - now
|
|
).total_seconds()
|
|
|
|
await asyncio.gather(
|
|
asyncio.sleep(settings.arbiter_tick_cycle),
|
|
*[
|
|
socket.send_json(
|
|
{
|
|
**self.state.__dict__,
|
|
"playerIdle": not self.clients[socket].is_active,
|
|
}
|
|
)
|
|
for socket in self.clients
|
|
],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
|
|
arbiter = Arbiter()
|
|
|
|
|
|
@arbiter.mode("democracy", allow_multitouch=False)
|
|
async def _(get_input: InputGetter, set_output: OutputSetter):
|
|
while True:
|
|
await asyncio.sleep(settings.democracy_vote_cycle)
|
|
|
|
vote = {button: 0 for button in Button}
|
|
vote["none"] = 0
|
|
|
|
inputs: list[Input] = await get_input()
|
|
|
|
if not inputs:
|
|
set_output(EMPTY_INPUT)
|
|
continue
|
|
|
|
for input in inputs:
|
|
# since multitouch is not allowed, we can assume that
|
|
# at most one entry is true
|
|
for button in Button:
|
|
if input[button]:
|
|
vote[button] += 1
|
|
break
|
|
else:
|
|
# cursed python syntax
|
|
vote["none"] += 1
|
|
|
|
max_vote = max(vote.values())
|
|
choice = random.choice(
|
|
[key for (key, value) in vote.items() if value == max_vote]
|
|
)
|
|
|
|
output = {button: False for button in Button}
|
|
if choice != "none":
|
|
output[choice] = True
|
|
|
|
set_output(output)
|
|
|
|
|
|
@arbiter.mode("anarchy", allow_multitouch=False)
|
|
async def _(get_input: InputGetter, set_output: OutputSetter):
|
|
while True:
|
|
await asyncio.sleep(settings.democracy_vote_cycle)
|
|
|
|
inputs: list[Input] = await get_input()
|
|
|
|
the_input = random.choice(inputs)
|
|
|
|
if not the_input:
|
|
set_output(EMPTY_INPUT)
|
|
continue
|
|
|
|
output = {button: the_input[button] for button in Button}
|
|
|
|
set_output(output)
|