ManInTheMiddleAmongUs/proto/parser.py
2020-10-20 02:46:01 +02:00

120 lines
2.9 KiB
Python

from dataclasses import dataclass
from typing import *
from . import fieldtype
T = TypeVar("T")
@dataclass
class FieldData(Generic[T]):
name: Optional[str]
data: T
formatter: fieldtype.Formatter[T]
def __str__(self):
return self.formatter(self.data)
def __repr__(self):
return f"<Field {f'{self.name!r}' if self.name else '(unnamed)'} ({self})>"
@dataclass
class Packet:
fields: List[FieldData]
class UnknownPacket(Packet):
def __str__(self):
return f"Unknown {''.join(str(f) for f in self.fields)}"
@dataclass
class AmbiguousPacket(Packet):
candidates: List[Packet]
class Buffer:
def __init__(self, data: bytes):
self.data = data
def consume(self, n: int) -> bytes:
assert n <= len(self.data)
result, self.data = self.data[:n], self.data[n:]
return result
class FieldGetter:
def __init__(self, idx):
self.idx = idx
def __call__(self, obj):
return obj.fields[self.idx].data
class MetaParser:
def __init__(self):
self.registered = list()
def register(self, cls):
assert issubclass(cls, Packet)
assert hasattr(cls, "Fields")
for idx, field in enumerate(cls.Fields):
if field.name is None:
continue
assert not hasattr(cls, field.name)
setattr(cls, field.name, property(FieldGetter(idx)))
self.registered.append(cls)
return cls
def _try_parse(
self, data: bytes, cls: Type[Packet], fields: List[fieldtype.FieldType]
) -> Packet:
res: List[FieldData] = []
buffer = Buffer(data)
backref: Dict[str, int] = dict()
for field in fields:
n = field.length
if isinstance(n, fieldtype.Backref):
n = backref[n]
field_data = FieldData(
field.name,
field.extractor(buffer.consume(n)), # type: ignore
field.formatter, # type: ignore
)
if field.name is not None and isinstance(field_data.data, int):
backref[field.name] = field_data.data
res.append(field_data)
return cls(res)
def parse(self, data: bytes):
possible_results = list()
for cls in self.registered:
try:
res = self._try_parse(data, cls, cls.Fields)
possible_results.append(res)
except (KeyError, ValueError, AssertionError) as e:
continue
if len(possible_results) == 0:
return UnknownPacket(
[FieldData(None, data, fieldtype.default_bytes_formatter)]
)
elif len(possible_results) > 1:
return AmbiguousPacket(
[FieldData(None, data, fieldtype.default_bytes_formatter)],
possible_results,
)
else:
return possible_results[0]