import struct TYPE_CHAR = b"b", 1 TYPE_BYTE = b"B", 1 TYPE_BOOL = b"?", 1 TYPE_SHORT = b"h", 2 TYPE_USHORT = b"H", 2 TYPE_INT = b"i", 4 TYPE_UINT = b"I", 4 TYPE_LONG = b"l", 4 TYPE_ULONG = b"L", 4 TYPE_LLONG = b"q", 8 TYPE_ULLONG = b"Q", 8 TYPE_FLOAT = b"f", 4 TYPE_DOUBLE = b"d", 8 TYPE_STRING = b"s", 1 TYPE_ULEB = 1 # Just so we have the enum type~ish BYTEORDER_NATIVE = b"@" BYTEORDER_LITTLE = b"<" BYTEORDER_BIG = b">" def encode_uleb(val): data = b"" while val != 0: b = val & 0x7f val >>= 7 if val != 0: b |= 0x80 data += bytes([b]) return data def decode_uleb(data): val = 0 shift = 0 pos = 0 while True: b = data[pos] pos += 1 val |= (b & 0x7f) << shift if (b & 0x80) == 0: break shift += 7 return val, pos def guess_type(value): t_value = type(value) if t_value is bool: return TYPE_BOOL if t_value is float: return TYPE_FLOAT if t_value is str: return TYPE_STRING if t_value is bytes: return TYPE_STRING if t_value is int: bit_len = 0 while value: value >>= 1 bit_len += 1 byte_len = (bit_len + 1) // 2 if byte_len <= 1: return TYPE_BYTE if byte_len <= 2: return TYPE_USHORT if byte_len <= 4: return TYPE_UINT if byte_len <= 8: return TYPE_ULLONG raise Exception("Unable to guess type") def read(value_type, stream): #if value_type == TYPE_STRING: # read() struct.unpack(value_type[0], stream[:value_type[1]]) class Serializable: length = None prefix = b"" pre_serialized = None def __init__(self, value, type = None, length = None, prefix = b"", pre = None): self.value = value if type is not None: self.type = type else: self.type = guess_type(type) self.length = length self.prefix = prefix self.pre_serialized = pre def get_struct_fmt(self): if self.length is not None: return b"%d%b" % (self.length.value, self.type[0]) return self.type[0] def get_value(self): val = self.value if self.pre_serialized is not None: val[-1] = self.pre_serialized(val[-1]) return val def pack(self, byte_order = BYTEORDER_LITTLE): if self.type == TYPE_ULEB: return encode_uleb(self.value) struct_fmt = byte_order + self.get_struct_fmt() packed = struct.pack(struct_fmt, self.get_value()) if self.prefix is not None: packed = self.prefix + packed return packed def unpack(self, length = -1, byte_order = BYTEORDER_LITTLE): if self.type == TYPE_ULEB: self.value = decode_uleb(2) return if self.type == TYPE_STRING: length = struct_fmt = byte_order + self.get_struct_fmt(length) class Serializer: def __init__(self, byte_order = BYTEORDER_LITTLE): self.stack = [] self.byte_order = byte_order def add(self, serializable): if type(serializable) is list: self.stack += serializable else: self.stack.append(serializable) def flush(self): return b"".join(s.pack() for s in self.stack) def __bytes__(self): return self.flush() class Deserializer: def __init__(self, byte_order = BYTEORDER_LITTLE): self.stack = [] self.byte_order = byte_order def add(self, serializable): if type(serializable) is list: self.stack += serializable else: self.stack.append(serializable) """ def deserialize_data(self, stream): pos = 0 for s in self.stack: struct. s.value """