ManInTheMiddleAmongUs/old/forwarding.py

53 lines
1.7 KiB
Python

import asyncio
import signal
async def a2b(a_reader: asyncio.StreamReader, b_writer: asyncio.StreamWriter, tag: str, queue: asyncio.Queue):
while True:
data = await a_reader.read(65536)
if not data:
break
await queue.put((tag, data))
b_writer.write(data)
class Forwarder:
def __init__(self, listen_host: str, listen_port: int, server_host: str, server_port: int, queue: asyncio.Queue):
self.listen_host = listen_host
self.listen_port = listen_port
self.server_host = server_host
self.server_port = server_port
self.queue = queue
async def handle_client(self, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter):
server_reader, server_writer = await asyncio.open_connection(self.server_host, self.server_port)
s2c = asyncio.create_task(a2b(server_reader, client_writer, 'server', self.queue))
c2s = asyncio.create_task(a2b(client_reader, server_writer, 'client', self.queue))
_, pending = await asyncio.wait([s2c, c2s], return_when=asyncio.FIRST_COMPLETED)
for future in pending:
future.cancel()
if not client_writer.is_closing():
client_writer.close()
if not server_writer.is_closing():
server_writer.close()
await asyncio.wait([client_writer.wait_closed(), server_writer.wait_closed()])
async def run(self):
srv = await asyncio.start_server(self.handle_client, host=self.listen_host, port=self.listen_port)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, lambda: srv.close())
try:
await srv.serve_forever()
except asyncio.CancelledError:
pass