102 lines
2.0 KiB
Python
102 lines
2.0 KiB
Python
|
import struct
|
||
|
|
||
|
TYPE_CHAR = b"b"
|
||
|
TYPE_BYTE = b"B"
|
||
|
TYPE_BOOL = b"?"
|
||
|
TYPE_SHORT = b"h"
|
||
|
TYPE_USHORT = b"H"
|
||
|
TYPE_INT = b"i"
|
||
|
TYPE_UINT = b"I"
|
||
|
TYPE_LONG = b"l"
|
||
|
TYPE_ULONG = b"L"
|
||
|
TYPE_LLONG = b"q"
|
||
|
TYPE_ULLONG = b"Q"
|
||
|
TYPE_FLOAT = b"f"
|
||
|
TYPE_DOUBLE = b"d"
|
||
|
TYPE_STRING = b"B%ds"
|
||
|
TYPE_BYTESTREAM = b"%ds"
|
||
|
|
||
|
BYTEORDER_NATIVE = b"@"
|
||
|
BYTEORDER_LITTLE = b"<"
|
||
|
BYTEORDER_BIG = b">"
|
||
|
|
||
|
def guess_type(value):
|
||
|
t_value = type(value)
|
||
|
|
||
|
if t_value is bool:
|
||
|
return TYPE_BOOL
|
||
|
if t_value is float:
|
||
|
return TYPE_FLOAT
|
||
|
if t_value is str:
|
||
|
return TYPE_STRING
|
||
|
if t_value is bytes:
|
||
|
return TYPE_STRING
|
||
|
if t_value is int:
|
||
|
bit_len = 0
|
||
|
while value:
|
||
|
value >>= 1
|
||
|
bit_len += 1
|
||
|
|
||
|
byte_len = (bit_len + 1) // 2
|
||
|
if byte_len <= 1:
|
||
|
return TYPE_BYTE
|
||
|
if byte_len <= 2:
|
||
|
return TYPE_USHORT
|
||
|
if byte_len <= 4:
|
||
|
return TYPE_UINT
|
||
|
if byte_len <= 8:
|
||
|
return TYPE_ULLONG
|
||
|
|
||
|
raise Exception("Unable to guess type")
|
||
|
|
||
|
class Serializable:
|
||
|
post_serialized = None
|
||
|
|
||
|
def __init__(self, value, type = None, post = None):
|
||
|
self.value = value
|
||
|
|
||
|
if type is not None:
|
||
|
self.type = type
|
||
|
else:
|
||
|
self.type = guess_type(type)
|
||
|
|
||
|
self.post_serialized = post
|
||
|
|
||
|
def get_struct_fmt(self):
|
||
|
if self.type in [TYPE_STRING, TYPE_BYTESTREAM]:
|
||
|
return self.type % len(self.get_value()[-1])
|
||
|
return self.type
|
||
|
|
||
|
def get_value(self):
|
||
|
def _get_value():
|
||
|
if self.type == TYPE_STRING:
|
||
|
return [11, bytes([len(self.value)]) + self.value]
|
||
|
return [self.value]
|
||
|
|
||
|
val = _get_value()
|
||
|
if self.post_serialized is not None:
|
||
|
val[-1] = self.post_serialized(val[-1])
|
||
|
|
||
|
return val
|
||
|
|
||
|
def pack(self, byte_order = BYTEORDER_LITTLE):
|
||
|
struct_fmt = byte_order + self.get_struct_fmt()
|
||
|
return struct.pack(struct_fmt, *self.get_value())
|
||
|
|
||
|
class Serializer:
|
||
|
def __init__(self, byte_order = BYTEORDER_LITTLE):
|
||
|
self.stack = []
|
||
|
self.byte_order = byte_order
|
||
|
|
||
|
def add(self, serializable):
|
||
|
if type(serializable) is list:
|
||
|
self.stack += serializable
|
||
|
else:
|
||
|
self.stack.append(serializable)
|
||
|
|
||
|
def flush(self):
|
||
|
return b"".join(s.pack() for s in self.stack)
|
||
|
|
||
|
def __bytes__(self):
|
||
|
return self.flush()
|