This commit is contained in:
Kai Vogelgesang 2020-10-11 05:40:56 +02:00
parent 5e6b6a299b
commit ace05e8d0a
3 changed files with 92 additions and 31 deletions

View File

@ -33,11 +33,23 @@ def handle2(tag: str, data: str):
print(f"[{tag}] {data}")
ignore = [
proto.AckPacket,
proto.PingPacket,
proto.FinPacket,
]
def handle(tag: str, data: str):
if tag == "stdin":
print(f"# {data}")
print(f"{Color.GRAY}# {data}{Color.RESET}")
return
d_bytes = binascii.unhexlify(data)
pkt = proto.Parser.parse_packet(d_bytes)
print(pkt)
for packet_type in ignore:
if isinstance(pkt, packet_type):
return
print(f"[{Color.BLUE if tag == 'server' else Color.GREEN}{tag}{Color.RESET}] {pkt}")

View File

@ -2,6 +2,7 @@ import enum
import binascii
import protoparser
from protoparser import Packet
class HazelPacketType(enum.IntEnum):
@ -22,14 +23,40 @@ def int_big_endian(data: bytes) -> int:
Parser = protoparser.Parser()
Parser.register("Ping", HazelPacketType.PING, ("nonce", 2, int_big_endian))
Parser.register("Ack", HazelPacketType.ACK, ("nonce", 2, int_big_endian), 0xFF)
Parser.register("Fin", HazelPacketType.FIN)
Parser.register(
"Hello",
@Parser.register(HazelPacketType.PING, ("nonce", 2, int_big_endian))
class PingPacket(Packet):
def __init__(self, data, nonce):
self.nonce = nonce
super().__init__(data)
def __repr__(self):
return f"Ping {self.nonce}"
@Parser.register(HazelPacketType.ACK, ("nonce", 2, int_big_endian), 0xFF)
class AckPacket(Packet):
def __init__(self, data, nonce):
self.nonce = nonce
super().__init__(data)
def __repr__(self):
return f"Ack {self.nonce}"
@Parser.register(HazelPacketType.FIN)
class FinPacket(Packet):
def __repr__(self):
return "Fin"
@Parser.register(
HazelPacketType.HELLO,
(None, 7, None),
("name_len", 1, int_big_endian),
("name", "name_len", bytes.decode),
)
class HelloPacket(Packet):
def __init__(self, data, name, **kwargs):
self.name = name
super().__init__(data)

View File

@ -1,4 +1,6 @@
from typing import Tuple, Dict, List, Union, Callable, Optional, Any
import binascii
import enum
from typing import Tuple, Dict, List, Union, Callable, Optional, Any, Type
Extractor = Callable[[bytes], Any]
@ -18,7 +20,7 @@ FieldSpec = Union[
None,
]
Spec = Tuple[str, List[FieldSpec]]
# Spec = Tuple[..., List[FieldSpec]]
class Buffer:
@ -31,55 +33,76 @@ class Buffer:
return result
class Packet:
def __init__(self, data: List[Any], **kwargs):
del kwargs # unused
self.data = data
def __repr__(self):
res = list()
for item in self.data:
if isinstance(item, enum.Enum):
res.append(item.name)
elif isinstance(item, bytes):
res.append(binascii.hexlify(item).decode())
else:
res.append(str(item))
return f"{self.__class__.__name__} [{' '.join(res)}]"
class AmbiguousPacket(Packet):
pass
class UnknownPacket(Packet):
pass
class Parser:
def __init__(self):
self.specs = list()
def register(self, name: str, *fields: FieldSpec):
self.specs.append((name, fields))
def register(self, *fields: FieldSpec):
def deco(cls: Type[Packet]):
self.specs.append((cls, fields))
return cls
def parse_packet(self, data: bytes):
return deco
def parse_packet(self, data: bytes) -> Packet:
result = None
for spec in self.specs:
for (cls, fields) in self.specs:
try:
m = self.match_spec(spec, data)
m = self.match_spec(cls, fields, data)
except AssertionError:
continue
if m:
if result:
return {"type": "ambiguous", "data": [data]}
return AmbiguousPacket([data])
result = m
if not result:
return {"type": "unknown", "data": [data]}
return UnknownPacket([data])
return result
@staticmethod
def match_spec(spec: Spec, data: bytes):
def match_spec(cls: Type[Packet], fields: List[FieldSpec], data: bytes) -> Packet:
buffer = Buffer(data)
backref: Dict[str, Any] = dict()
typename, fields = spec
result = {
"type": typename,
"data": list(),
}
res_data: List[Any] = list()
for fieldspec in fields:
if isinstance(fieldspec, int):
assert buffer.consume(1)[0] == fieldspec
# ???
result["data"].append(fieldspec) # type: ignore
res_data.append(fieldspec)
continue
if fieldspec is None:
result["data"].append(buffer.data) # type: ignore
res_data.append(buffer.data)
break # TODO implement unknown blob can also be in the middle
if isinstance(fieldspec, tuple):
@ -99,7 +122,6 @@ class Parser:
if fieldname:
backref[fieldname] = fielddata
result["data"].append(fielddata) # type: ignore
res_data.append(fielddata)
result["fields"] = backref # type: ignore
return result
return cls(res_data, **backref)