ManInTheMiddleAmongUs/protoparser.py
2020-10-11 22:54:40 +02:00

128 lines
3.1 KiB
Python

import binascii
import enum
from typing import Tuple, Dict, List, Union, Callable, Optional, Any, Type
Extractor = Callable[[bytes], Any]
FieldSpec = Union[
# specific value to be expected
bytes,
# [named] field with fixed length
Tuple[Optional[str], int, Optional[Extractor]],
# [named] field with length backreference
Tuple[Optional[str], str, Optional[Extractor]],
# parse until end
None,
]
# Spec = Tuple[..., List[FieldSpec]]
class Buffer:
def __init__(self, data: bytes):
self.data = data
def consume(self, n: int) -> bytes:
assert n <= len(self.data)
result, self.data = self.data[:n], self.data[n:]
return result
class Packet:
def __init__(self, data: List[Any], **kwargs):
del kwargs # unused
self.data = data
def __repr__(self):
res = list()
for item in self.data:
if isinstance(item, enum.Enum):
res.append(item.name)
elif isinstance(item, bytes):
res.append(binascii.hexlify(item).decode())
else:
res.append(str(item))
return f"{self.__class__.__name__} [{' '.join(res)}]"
class AmbiguousPacket(Packet):
pass
class UnknownPacket(Packet):
pass
class Parser:
def __init__(self):
self.specs = list()
def register(self, *fields: FieldSpec):
def deco(cls: Type[Packet]):
self.specs.append((cls, fields))
return cls
return deco
def parse_packet(self, data: bytes) -> Packet:
result = None
for (cls, fields) in self.specs:
try:
m = _match_spec(cls, fields, data)
except AssertionError:
continue
if m:
if result:
return AmbiguousPacket([data])
result = m
if not result:
return UnknownPacket([data])
return result
def _match_spec(cls: Type[Packet], fields: List[FieldSpec], data: bytes) -> Packet:
buffer = Buffer(data)
backref: Dict[str, Any] = dict()
res_data: List[Any] = list()
for fieldspec in fields:
if isinstance(fieldspec, bytes):
assert buffer.consume(len(fieldspec)) == fieldspec
res_data.append(fieldspec)
continue
if fieldspec is None:
res_data.append(buffer.data)
break # TODO implement unknown blob can also be in the middle
if isinstance(fieldspec, tuple):
fieldname, fieldlen, extractor = fieldspec
# backreference
if isinstance(fieldlen, str):
fieldlen = backref[fieldlen]
assert isinstance(fieldlen, int)
fielddata = buffer.consume(fieldlen)
if extractor:
fielddata = extractor(fielddata)
if fieldname:
backref[fieldname] = fielddata
res_data.append(fielddata)
return cls(res_data, **backref)
def unknown(n: int, format: Extractor = None) -> FieldSpec:
return (None, n, format)