Implement input poc

This commit is contained in:
Kai Vogelgesang 2022-10-27 18:08:27 +02:00
parent 612cf7ab48
commit eddd2b9739
4 changed files with 120 additions and 0 deletions

View File

@ -0,0 +1,20 @@
import asyncio
from hypercorn.config import Config
from hypercorn.asyncio import serve
from .web import app
from .pag import Runner
config = Config()
config.bind = ["0.0.0.0:8000"]
pag_runner = Runner()
pag_runner.start()
asyncio.run(serve(app, config))
pag_runner.stop()
print("stopping...")
pag_runner.join()

36
backend/backend/input.py Normal file
View File

@ -0,0 +1,36 @@
from enum import Enum
from dataclasses import dataclass
from typing import Literal
class Button(str, Enum):
UP = "up"
DOWN = "down"
LEFT = "left"
RIGHT = "right"
A = "a"
B = "b"
L = "l"
R = "r"
START = "start"
SELECT = "select"
Input = dict[Button, bool]
KEYMAP = {
Button.UP: "up",
Button.DOWN: "down",
Button.LEFT: "left",
Button.RIGHT: "right",
Button.A: "x",
Button.B: "z",
Button.L: "a",
Button.R: "s",
Button.START: "return",
Button.SELECT: "backspace",
}
@dataclass
class Event:
button: Button
direction: Literal["down", "up"]

53
backend/backend/pag.py Normal file
View File

@ -0,0 +1,53 @@
import pyautogui
from threading import Thread
from time import sleep
from .input import KEYMAP, Input, Button, Event
class InputHandler:
def __init__(self):
self.current: Input = {button: False for button in Button}
self.previous: Input = {button: False for button in Button}
print(f"sanity check: {self.previous=}, {self.current=}")
def set(self, input: Input):
# MUST NOT BLOCK
# to be called from asyncio context
self.current = input
def get(self) -> list[Event]:
events = []
for button in Button:
match (self.previous[button], self.current[button]):
case (True, False):
events.append(Event(button, "up"))
case (False, True):
events.append(Event(button, "down"))
self.previous = self.current
return events
input = InputHandler()
class Runner(Thread):
def __init__(self):
super().__init__()
self.running = True
def stop(self):
self.running = False
def run(self):
while self.running:
events = input.get()
for event in events:
print(f"[pag] key{event.direction} {event.button}")
if event.direction == "down":
pyautogui.keyDown(KEYMAP[event.button])
else:
pyautogui.keyUp(KEYMAP[event.button])
sleep(0.05)

View File

@ -1,7 +1,12 @@
import asyncio
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from .pag import input as pag_input
from .input import Button
app = FastAPI()
backend = FastAPI()
@ -12,10 +17,16 @@ async def client_handler(socket: WebSocket):
print("WS opened")
await socket.accept()
input = {button: False for button in Button}
while True:
try:
data = await socket.receive_json()
print(f"WS data: {data!r}")
button = data["button"]
pag_input.set({**input, button: True})
await asyncio.sleep(0.1)
pag_input.set({**input, button: False})
except WebSocketDisconnect:
break