From 5e6b6a299bb01ea1ed8c7b0fa2f6b2cee657b0fc Mon Sep 17 00:00:00 2001 From: Kai Vogelgesang Date: Sun, 11 Oct 2020 05:00:10 +0200 Subject: [PATCH] Implement Parser --- analyze.py | 33 ++++++++++++++++ handler.py | 27 +++++++++++-- proto.py | 35 +++++++++++++++++ protoparser.py | 105 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 analyze.py create mode 100644 proto.py create mode 100644 protoparser.py diff --git a/analyze.py b/analyze.py new file mode 100644 index 0000000..353b2a6 --- /dev/null +++ b/analyze.py @@ -0,0 +1,33 @@ +import binascii + + +class Analyzer: + def __init__(self): + self.stats = dict() + + def consume(self, tag, data): + if tag == "stdin": + return + + first_byte = binascii.unhexlify(data)[0] + self.stats[first_byte] = self.stats.get(first_byte, 0) + 1 + + +def run(filename): + analyzer = Analyzer() + + with open(filename, "r") as f: + for line in f.readlines(): + tag, data = line.rstrip("\n").split(" ", 1) + analyzer.consume(tag, data) + + for k, v in analyzer.stats.items(): + print(f"{k}: {v}") + + return analyzer + + +if __name__ == "__main__": + import sys + + run(sys.argv[1]) diff --git a/handler.py b/handler.py index a1b2ff6..20ca27c 100644 --- a/handler.py +++ b/handler.py @@ -1,15 +1,14 @@ import binascii import itertools +import proto from colordiff import Color, Diff d_c = Diff(Color.GREEN, Color.RED) d_s = Diff(Color.BLUE, Color.YELLOW) -def handle(tag: str, data: str): - if tag == "stdin": - print(f"[stdin] {data}") +def handle3(tag: str, data: str): if data.startswith("0c") or data.startswith("0a"): if tag == "server": print(f"{Color.YELLOW}{data}{Color.RESET}") @@ -20,3 +19,25 @@ def handle(tag: str, data: str): d_s.consume(data) elif tag == "client": d_c.consume(data) + + +def handle2(tag: str, data: str): + if tag == "stdin": + # print(f"[stdin] {data}") + return + + d_bytes = binascii.unhexlify(data) + if d_bytes[0] not in (8, 9): + return + + print(f"[{tag}] {data}") + + +def handle(tag: str, data: str): + if tag == "stdin": + print(f"# {data}") + return + + d_bytes = binascii.unhexlify(data) + pkt = proto.Parser.parse_packet(d_bytes) + print(pkt) diff --git a/proto.py b/proto.py new file mode 100644 index 0000000..87f9269 --- /dev/null +++ b/proto.py @@ -0,0 +1,35 @@ +import enum +import binascii + +import protoparser + + +class HazelPacketType(enum.IntEnum): + UNRELIABLE = (0,) + RELIABLE = (1,) + + HELLO = (8,) + PING = (12,) + ACK = (10,) + FIN = (9,) + + FRAGMENT = (11,) # not observed yet, maybe unused in among us? + + +def int_big_endian(data: bytes) -> int: + return int.from_bytes(data, "big") + + +Parser = protoparser.Parser() + +Parser.register("Ping", HazelPacketType.PING, ("nonce", 2, int_big_endian)) +Parser.register("Ack", HazelPacketType.ACK, ("nonce", 2, int_big_endian), 0xFF) +Parser.register("Fin", HazelPacketType.FIN) + +Parser.register( + "Hello", + HazelPacketType.HELLO, + (None, 7, None), + ("name_len", 1, int_big_endian), + ("name", "name_len", bytes.decode), +) diff --git a/protoparser.py b/protoparser.py new file mode 100644 index 0000000..e4be136 --- /dev/null +++ b/protoparser.py @@ -0,0 +1,105 @@ +from typing import Tuple, Dict, List, Union, Callable, Optional, Any + +Extractor = Callable[[bytes], Any] + + +def int_big_endian(data: bytes) -> int: + return int.from_bytes(data, "big") + + +FieldSpec = Union[ + # specific value to be expected + int, + # [named] field with fixed length + Tuple[Optional[str], int, Optional[Extractor]], + # [named] field with length backreference + Tuple[Optional[str], str, Optional[Extractor]], + # parse until end + None, +] + +Spec = Tuple[str, List[FieldSpec]] + + +class Buffer: + def __init__(self, data: bytes): + self.data = data + + def consume(self, n: int) -> bytes: + assert n <= len(self.data) + result, self.data = self.data[:n], self.data[n:] + return result + + +class Parser: + def __init__(self): + self.specs = list() + + def register(self, name: str, *fields: FieldSpec): + self.specs.append((name, fields)) + + def parse_packet(self, data: bytes): + result = None + for spec in self.specs: + try: + m = self.match_spec(spec, data) + except AssertionError: + continue + if m: + if result: + return {"type": "ambiguous", "data": [data]} + result = m + + if not result: + return {"type": "unknown", "data": [data]} + + return result + + @staticmethod + def match_spec(spec: Spec, data: bytes): + + buffer = Buffer(data) + + backref: Dict[str, Any] = dict() + + typename, fields = spec + + result = { + "type": typename, + "data": list(), + } + + for fieldspec in fields: + if isinstance(fieldspec, int): + assert buffer.consume(1)[0] == fieldspec + + # ??? + result["data"].append(fieldspec) # type: ignore + + continue + + if fieldspec is None: + result["data"].append(buffer.data) # type: ignore + break # TODO implement unknown blob can also be in the middle + + if isinstance(fieldspec, tuple): + fieldname, fieldlen, extractor = fieldspec + + # backreference + if isinstance(fieldlen, str): + fieldlen = backref[fieldlen] + + assert isinstance(fieldlen, int) + + fielddata = buffer.consume(fieldlen) + + if extractor: + fielddata = extractor(fielddata) + + if fieldname: + backref[fieldname] = fielddata + + result["data"].append(fielddata) # type: ignore + + result["fields"] = backref # type: ignore + return result