This commit is contained in:
Kai Vogelgesang 2019-10-16 23:40:32 +02:00
commit a5c52132fe
31 changed files with 751 additions and 0 deletions

217
.gitignore vendored Normal file
View File

@ -0,0 +1,217 @@
# Created by https://www.gitignore.io/api/python,pycharm,virtualenv
# Edit at https://www.gitignore.io/?templates=python,pycharm,virtualenv
### PyCharm ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### PyCharm Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr
# Sonarlint plugin
.idea/**/sonarlint/
# SonarQube Plugin
.idea/**/sonarIssues.xml
# Markdown Navigator plugin
.idea/**/markdown-navigator.xml
.idea/**/markdown-navigator/
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
### VirtualEnv ###
# Virtualenv
# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/
[Bb]in
[Ii]nclude
[Ll]ib
[Ll]ib64
[Ll]ocal
[Ss]cripts
pyvenv.cfg
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
pip-selfcheck.json
# End of https://www.gitignore.io/api/python,pycharm,virtualenv

View File

@ -0,0 +1,57 @@
import struct
from Proto.Vector import VECTOR
class DMPLayer:
ADDRESS_TYPE_AND_DATA_TYPE = 0xA1
FIRST_PROPERTY_ADDRESS = 0x0000
ADDRESS_INCREMENT = 0x0001
PACK_FORMAT = '! H c B H H H'
HEADER_SIZE = struct.calcsize(PACK_FORMAT)
def __init__(self, length: int, property_values: bytes):
assert 0 <= length <= 0xFFF
assert 1 <= len(property_values) <= 513
self.length = length
self.property_values = property_values
def __bytes__(self):
buffer = struct.pack(
self.PACK_FORMAT,
0x7 << 12 | self.length,
VECTOR.DMP_SET_PROPERTY,
self.ADDRESS_TYPE_AND_DATA_TYPE,
self.FIRST_PROPERTY_ADDRESS,
self.ADDRESS_INCREMENT,
len(self.property_values),
)
buffer += self.property_values
return buffer
@classmethod
def from_bytes(cls, buffer, offset=0):
(flags_and_length,
vector,
addres_type_and_data_type,
first_property_address,
address_increment,
property_value_count) = struct.unpack_from(cls.PACK_FORMAT, buffer, offset)
assert flags_and_length >> 12 == 0x7
assert vector == VECTOR.DMP_SET_PROPERTY
assert addres_type_and_data_type == cls.ADDRESS_TYPE_AND_DATA_TYPE
assert first_property_address == cls.FIRST_PROPERTY_ADDRESS
assert address_increment == cls.ADDRESS_INCREMENT
property_value_offset = offset + cls.HEADER_SIZE
length = flags_and_length & 0xFFF
property_values = buffer[property_value_offset:property_value_offset + property_value_count]
return cls(length, property_values)

View File

@ -0,0 +1,66 @@
import struct
from enum import IntFlag
from Proto.Vector import VECTOR
from Proto.util import decode_source_name
class DataPacketFramingLayer:
class Options(IntFlag):
FORCE_SYNCHRONIZATION = 1 << 5
STREAM_TERMINATED = 1 << 6
PREVIEW_DATA = 1 << 7
PACK_FORMAT = '! H 4s 64s B 2s B B 2s'
HEADER_SIZE = struct.calcsize(PACK_FORMAT)
def __init__(self, length: int, source_name: str, priority: int, synchronization_address: bytes,
sequence_number: int, options: Options, universe: bytes):
assert 0 <= length <= 0x0FFF
assert len(source_name) <= 64
assert 0 <= priority <= 200
assert len(synchronization_address) == 2
assert 0 <= sequence_number <= 0xFF
assert len(universe) == 2
self.length = length
self.source_name = source_name
self.priority = priority
self.synchronization_address = synchronization_address
self.sequence_number = sequence_number
self.options = options
self.universe = universe
def __bytes__(self):
return struct.pack(
self.PACK_FORMAT,
0x7 << 12 | self.length,
VECTOR.E131_DATA_PACKET,
self.source_name.encode('utf-8'),
self.priority,
self.synchronization_address,
self.sequence_number,
self.options,
self.universe
)
@classmethod
def from_bytes(cls, buffer: bytes, offset: int = 0):
(flags_and_length,
vector,
source_name,
priority,
synchronization_address,
sequence_number,
options,
universe) = struct.unpack_from(cls.PACK_FORMAT, buffer, offset)
assert flags_and_length >> 12 == 0x7
assert vector == VECTOR.E131_DATA_PACKET
length = flags_and_length & 0xFFF
source_name = decode_source_name(source_name)
options = cls.Options(options)
return cls(length, source_name, priority, synchronization_address, sequence_number, options, universe)

View File

@ -0,0 +1,51 @@
import struct
from Proto.Vector import VECTOR
class RootLayer:
PREAMBLE_SIZE = 0x0010
POSTAMBLE_SIZE = 0x0000
ACN_PACKET_IDENTIFIER = b'ASC-E1.17\x00\x00\x00'
PACK_FORMAT = '! H H 12s H 4s 16s'
HEADER_SIZE = struct.calcsize(PACK_FORMAT)
def __init__(self, length: int, vector: bytes, cid: bytes):
assert 0 <= length <= 0xFFF
assert vector in (VECTOR.ROOT_E131_DATA, VECTOR.ROOT_E131_EXTENDED)
assert len(cid) == 16
self.length = length
self.vector = vector
self.cid = cid
def __bytes__(self):
return struct.pack(
self.PACK_FORMAT,
self.PREAMBLE_SIZE,
self.POSTAMBLE_SIZE,
self.ACN_PACKET_IDENTIFIER,
0x7 << 12 | self.length,
self.vector,
self.cid
)
@classmethod
def from_bytes(cls, buffer: bytes, offset: int = 0):
(preamble_size,
postamble_size,
acn_packet_identifier,
flags_and_length,
vector,
cid) = struct.unpack_from(cls.PACK_FORMAT, buffer, offset)
assert preamble_size == cls.PREAMBLE_SIZE
assert postamble_size == cls.POSTAMBLE_SIZE
assert acn_packet_identifier == cls.ACN_PACKET_IDENTIFIER
assert flags_and_length >> 12 == 0x7
length = flags_and_length & 0xFFF
return cls(length, vector, cid)

View File

@ -0,0 +1,41 @@
import struct
from Proto.Vector import VECTOR
class SynchronizationPacketFramingLayer:
PACK_FORMAT = '! H 4s B 2s 2x'
HEADER_SIZE = struct.calcsize(PACK_FORMAT)
def __init__(self, length: int, sequence_number: int, synchronization_address: bytes):
assert 0 <= length <= 0xFFF
assert 0 <= sequence_number <= 0xFF
assert len(synchronization_address) == 2
self.length = length
self.sequence_number = sequence_number
self.synchronization_address = synchronization_address
def __bytes__(self):
return struct.pack(
self.PACK_FORMAT,
0x7 << 12 | self.length,
VECTOR.E131_EXTENDED_SYNCHRONIZATION,
self.sequence_number,
self.synchronization_address,
)
@classmethod
def from_bytes(cls, buffer, offset=0):
(flags_and_length,
vector,
sequence_number,
synchronization_address) = struct.unpack_from(cls.PACK_FORMAT, buffer, offset)
assert flags_and_length >> 12 == 0x7
assert vector == VECTOR.E131_EXTENDED_SYNCHRONIZATION
length = flags_and_length & 0xFFF
return cls(length, sequence_number, synchronization_address)

View File

@ -0,0 +1,58 @@
import struct
from typing import List
from Proto.Vector import VECTOR
class UniverseDiscoveryLayer:
PACK_FORMAT = '! H 4s B B'
HEADER_SIZE = struct.calcsize(PACK_FORMAT)
def __init__(self, length: int, page: int, last_page: int, universe_list: List[bytes]):
assert 0 <= length <= 0xFFF
assert 0 <= page <= 0xFF
assert 0 <= last_page <= 0xFF
assert len(universe_list) <= 512
for universe in universe_list:
assert len(universe) == 2
self.length = length
self.page = page
self.last_page = last_page
self.universe_list = universe_list
def __bytes__(self):
buffer = struct.pack(
self.PACK_FORMAT,
0x7 << 12 | self.length,
VECTOR.UNIVERSE_DISCOVERY_UNIVERSE_LIST,
self.page,
self.last_page
)
buffer += b''.join(self.universe_list)
return buffer
@classmethod
def from_bytes(cls, buffer: bytes, offset: int = 0):
(flags_and_length,
vector,
page,
last_page) = struct.unpack_from(cls.PACK_FORMAT, buffer, offset)
assert flags_and_length >> 12 == 0x7
assert vector == VECTOR.UNIVERSE_DISCOVERY_UNIVERSE_LIST
universe_list_offset = offset + cls.HEADER_SIZE
length = flags_and_length & 0xFFF
universe_list = []
assert len(buffer) >= offset + length
for i in range(universe_list_offset, offset + length, 2):
universe_list.append(buffer[i:i+2])
return cls(length, page, last_page, universe_list)

View File

@ -0,0 +1,39 @@
import struct
from Proto.Vector import VECTOR
from Proto.util import decode_source_name
class UniverseDiscoveryPacketFramingLayer:
PACK_FORMAT = '! H 4s 64s 4x'
HEADER_SIZE = struct.calcsize(PACK_FORMAT)
def __init__(self, length: int, source_name: str):
assert 0 <= length <= 0xFFF
assert len(source_name) <= 64
self.length = length
self.source_name = source_name
def __bytes__(self):
return struct.pack(
self.PACK_FORMAT,
0x7 << 12 | self.length,
VECTOR.E131_EXTENDED_DISCOVERY,
self.source_name.encode('utf-8')
)
@classmethod
def from_bytes(cls, buffer, offset):
(flags_and_length,
vector,
source_name) = struct.unpack_from(cls.PACK_FORMAT, buffer, offset)
assert flags_and_length >> 12 == 0x7
assert vector == VECTOR.E131_EXTENDED_DISCOVERY
length = flags_and_length & 0xFFF
source_name = decode_source_name(source_name)
return cls(length, source_name)

View File

@ -0,0 +1,8 @@
from .RootLayer import RootLayer
from .DataPacketFramingLayer import DataPacketFramingLayer
from .SynchronizationPacketFramingLayer import SynchronizationPacketFramingLayer
from .UniverseDiscoveryPacketFramingLayer import UniverseDiscoveryPacketFramingLayer
from .DMPLayer import DMPLayer
from .UniverseDiscoveryLayer import UniverseDiscoveryLayer

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,49 @@
from Proto.Layers import RootLayer, DataPacketFramingLayer, DMPLayer
from Proto.Vector import VECTOR
class DataPacket:
def __init__(self, cid: bytes, source_name: str, priority: int, synchronization_address: bytes,
sequence_number: int, options: DataPacketFramingLayer.Options, universe: bytes,
property_values: bytes):
assert len(cid) == 16
assert len(source_name) <= 64
assert 0 <= priority <= 100
assert len(synchronization_address) == 2
assert 0 <= sequence_number <= 0xFF
assert len(universe) == 2
assert 1 <= len(property_values) <= 513
self.cid = cid
self.source_name = source_name
self.priority = priority
self.synchronization_address = synchronization_address
self.sequence_number = sequence_number
self.options = options
self.universe = universe
self.property_values = property_values
def __bytes__(self):
dmp_layer = DMPLayer(
DMPLayer.HEADER_SIZE + len(self.property_values),
self.property_values
)
data_packet_framing_layer = DataPacketFramingLayer(
DataPacketFramingLayer.HEADER_SIZE + dmp_layer.length,
self.source_name, self.priority,
self.synchronization_address, self.sequence_number,
self.options, self.universe
)
root_layer = RootLayer(
RootLayer.HEADER_SIZE + data_packet_framing_layer.length,
VECTOR.ROOT_E131_DATA, self.cid
)
return bytes(root_layer) + bytes(data_packet_framing_layer) + bytes(dmp_layer)
def __repr__(self):
return f'{self.__class__.__name__}({self.cid!r}, {self.source_name!r}, {self.priority!r}, ' \
f'{self.synchronization_address!r}, {self.sequence_number!r}, {self.options!r}, {self.universe!r}, ' \
f'{self.property_values!r})'

View File

@ -0,0 +1,66 @@
import struct
from Proto.Layers import *
from Proto.Packets import *
from Proto.Vector import VECTOR
def parse_packet(buffer: bytes):
root_layer = RootLayer.from_bytes(buffer)
if root_layer.vector == VECTOR.ROOT_E131_DATA:
# Definitely a Data Packet
data_packet_framing_layer = DataPacketFramingLayer.from_bytes(
buffer, RootLayer.HEADER_SIZE
)
dmp_layer = DMPLayer.from_bytes(
buffer, RootLayer.HEADER_SIZE + DataPacketFramingLayer.HEADER_SIZE
)
return DataPacket(
root_layer.cid,
data_packet_framing_layer.source_name,
data_packet_framing_layer.priority,
data_packet_framing_layer.synchronization_address,
data_packet_framing_layer.sequence_number,
data_packet_framing_layer.options,
data_packet_framing_layer.universe,
dmp_layer.property_values
)
else:
# Either Synchronization or Universe Discovery, check vector of next frame
(vector,) = struct.unpack_from('! 2x 4s', buffer, RootLayer.HEADER_SIZE)
assert vector in (VECTOR.E131_EXTENDED_SYNCHRONIZATION, VECTOR.E131_EXTENDED_DISCOVERY)
if vector == VECTOR.E131_EXTENDED_SYNCHRONIZATION:
synchronization_packet_framing_layer = SynchronizationPacketFramingLayer.from_bytes(
buffer, RootLayer.HEADER_SIZE
)
return SynchronizationPacket(
root_layer.cid,
synchronization_packet_framing_layer.sequence_number,
synchronization_packet_framing_layer.synchronization_address
)
else: # VECTOR.E131_EXTENDED_DISCOVERY
universe_discovery_packet_framing_layer = UniverseDiscoveryPacketFramingLayer.from_bytes(
buffer, RootLayer.HEADER_SIZE
)
universe_discovery_layer = UniverseDiscoveryLayer.from_bytes(
buffer, RootLayer.HEADER_SIZE + UniverseDiscoveryPacketFramingLayer.HEADER_SIZE
)
return UniverseDiscoveryPacket(
root_layer.cid,
universe_discovery_packet_framing_layer.source_name,
universe_discovery_layer.page,
universe_discovery_layer.last_page,
universe_discovery_layer.universe_list
)

View File

@ -0,0 +1,31 @@
from Proto.Layers import RootLayer, SynchronizationPacketFramingLayer
from Proto.Vector import VECTOR
class SynchronizationPacket:
def __init__(self, cid: bytes, sequence_number: int, synchronization_address: bytes):
assert len(cid) == 16
assert 0 <= sequence_number <= 0xFF
assert len(synchronization_address) == 2
self.cid = cid
self.sequence_number = sequence_number
self.synchronization_address = synchronization_address
def __bytes__(self):
framing_layer = SynchronizationPacketFramingLayer(
SynchronizationPacketFramingLayer.HEADER_SIZE,
self.sequence_number,
self.synchronization_address
)
root_layer = RootLayer(
RootLayer.HEADER_SIZE + framing_layer.length,
VECTOR.ROOT_E131_EXTENDED,
self.cid
)
return bytes(root_layer) + bytes(framing_layer)
def __repr__(self):
return f'{self.__class__.__name__}({self.cid}, {self.sequence_number}, {self.synchronization_address})'

View File

@ -0,0 +1,46 @@
from typing import List
from Proto.Layers import RootLayer, UniverseDiscoveryPacketFramingLayer, UniverseDiscoveryLayer
from Proto.Vector import VECTOR
class UniverseDiscoveryPacket:
def __init__(self, cid: bytes, source_name: str, page: int, last_page: int, universe_list: List[bytes]):
assert len(cid) == 16
assert len(source_name) <= 64
assert 0 <= page <= 0xFF
assert 0 <= last_page <= 0xFF
assert len(universe_list) <= 512
for universe in universe_list:
assert len(universe) == 2
self.cid = cid
self.source_name = source_name
self.page = page
self.last_page = last_page
self.universe_list = universe_list
def __bytes__(self):
universe_discovery_layer = UniverseDiscoveryLayer(
UniverseDiscoveryLayer.HEADER_SIZE + 2 * len(self.universe_list),
self.page,
self.last_page,
self.universe_list
)
universe_discovery_packet_framing_layer = UniverseDiscoveryPacketFramingLayer(
UniverseDiscoveryPacketFramingLayer.HEADER_SIZE + universe_discovery_layer.length,
self.source_name
)
root_layer = RootLayer(
RootLayer.HEADER_SIZE + universe_discovery_packet_framing_layer.length,
VECTOR.ROOT_E131_EXTENDED,
self.cid
)
return bytes(root_layer) + bytes(universe_discovery_packet_framing_layer) + bytes(universe_discovery_layer)
def __repr__(self):
return f'{self.__class__.__name__}({self.cid!r}, {self.source_name!r}, {self.page!r}, {self.last_page!r}, ' \
f'{self.universe_list!r})'

View File

@ -0,0 +1,5 @@
from .DataPacket import DataPacket
from .SynchronizationPacket import SynchronizationPacket
from .UniverseDiscoveryPacket import UniverseDiscoveryPacket
from .Parser import parse_packet

Binary file not shown.

Binary file not shown.

15
src/Proto/Vector.py Normal file
View File

@ -0,0 +1,15 @@
from enum import Enum
class VECTOR(bytes, Enum):
ROOT_E131_DATA = bytes([0x00, 0x00, 0x00, 0x04])
ROOT_E131_EXTENDED = bytes([0x00, 0x00, 0x00, 0x08])
DMP_SET_PROPERTY = bytes([0x02])
E131_DATA_PACKET = bytes([0x00, 0x00, 0x00, 0x02])
E131_EXTENDED_SYNCHRONIZATION = bytes([0x00, 0x00, 0x00, 0x01])
E131_EXTENDED_DISCOVERY = bytes([0x00, 0x00, 0x00, 0x02])
UNIVERSE_DISCOVERY_UNIVERSE_LIST = bytes([0x00, 0x00, 0x00, 0x01])

0
src/Proto/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

2
src/Proto/util.py Normal file
View File

@ -0,0 +1,2 @@
def decode_source_name(buffer: bytes):
return buffer.split(b'\x00')[0].decode('utf-8')