from typing import Tuple, Dict, List, Union, Callable, Optional, Any Extractor = Callable[[bytes], Any] def int_big_endian(data: bytes) -> int: return int.from_bytes(data, "big") FieldSpec = Union[ # specific value to be expected int, # [named] field with fixed length Tuple[Optional[str], int, Optional[Extractor]], # [named] field with length backreference Tuple[Optional[str], str, Optional[Extractor]], # parse until end None, ] Spec = Tuple[str, List[FieldSpec]] class Buffer: def __init__(self, data: bytes): self.data = data def consume(self, n: int) -> bytes: assert n <= len(self.data) result, self.data = self.data[:n], self.data[n:] return result class Parser: def __init__(self): self.specs = list() def register(self, name: str, *fields: FieldSpec): self.specs.append((name, fields)) def parse_packet(self, data: bytes): result = None for spec in self.specs: try: m = self.match_spec(spec, data) except AssertionError: continue if m: if result: return {"type": "ambiguous", "data": [data]} result = m if not result: return {"type": "unknown", "data": [data]} return result @staticmethod def match_spec(spec: Spec, data: bytes): buffer = Buffer(data) backref: Dict[str, Any] = dict() typename, fields = spec result = { "type": typename, "data": list(), } for fieldspec in fields: if isinstance(fieldspec, int): assert buffer.consume(1)[0] == fieldspec # ??? result["data"].append(fieldspec) # type: ignore continue if fieldspec is None: result["data"].append(buffer.data) # type: ignore break # TODO implement unknown blob can also be in the middle if isinstance(fieldspec, tuple): fieldname, fieldlen, extractor = fieldspec # backreference if isinstance(fieldlen, str): fieldlen = backref[fieldlen] assert isinstance(fieldlen, int) fielddata = buffer.consume(fieldlen) if extractor: fielddata = extractor(fielddata) if fieldname: backref[fieldname] = fielddata result["data"].append(fielddata) # type: ignore result["fields"] = backref # type: ignore return result