import struct TYPE_CHAR = b"b", 1 TYPE_BYTE = b"B", 1 TYPE_BOOL = b"?", 1 TYPE_SHORT = b"h", 2 TYPE_USHORT = b"H", 2 TYPE_INT = b"i", 4 TYPE_UINT = b"I", 4 TYPE_LONG = b"l", 4 TYPE_ULONG = b"L", 4 TYPE_LLONG = b"q", 8 TYPE_ULLONG = b"Q", 8 TYPE_FLOAT = b"f", 4 TYPE_DOUBLE = b"d", 8 TYPE_STRING = b"s", 1 TYPE_ULEB = b"s", 1, True # True just so they are different tuples BYTEORDER_NATIVE = b"@" BYTEORDER_LITTLE = b"<" BYTEORDER_BIG = b">" def encode_uleb(val): data = b"" while val != 0: b = val & 0x7f val >>= 7 if val != 0: b |= 0x80 data += bytes([b]) return data def decode_uleb(data): val = 0 shift = 0 pos = 0 while True: b = data[pos] pos += 1 val |= (b & 0x7f) << shift if (b & 0x80) == 0: break shift += 7 return val, pos def guess_type(value): t_value = type(value) if t_value is bool: return TYPE_BOOL if t_value is float: return TYPE_FLOAT if t_value is str: return TYPE_STRING if t_value is bytes: return TYPE_STRING if t_value is int: bit_len = 0 while value: value >>= 1 bit_len += 1 byte_len = (bit_len + 1) // 2 if byte_len <= 1: return TYPE_BYTE if byte_len <= 2: return TYPE_USHORT if byte_len <= 4: return TYPE_UINT if byte_len <= 8: return TYPE_ULLONG raise Exception("Unable to guess type") def read(value_type, stream): #if value_type == TYPE_STRING: # read() struct.unpack(value_type[0], stream[:value_type[1]]) class Serializable: length = None prefix = b"" filter = None def __init__(self, value, type = None, length = None, prefix = b"", filter = None): self.value = value if type is not None: self.type = type else: self.type = guess_type(type) self.length = length self.prefix = prefix self.filter = filter def get_struct_fmt(self, length = 1, uleb_length_override = -1): struct_fmt = self.type[0] if length > 1: struct_fmt = b"%d%b" % (length, self.type[0]) if self.length is not None: if self.length == TYPE_ULEB: struct_fmt = b"%d%b%b" % (len(self.get_value()[0]) if uleb_length_override == -1 else uleb_length_override, self.length[0], struct_fmt) else: struct_fmt = b"%b%b" % (self.length[0], struct_fmt) return struct_fmt def get_value(self): val = [self.value] if self.filter is not None: val[-1] = self.filter(val[-1], 0) if self.length is not None: if self.length == TYPE_ULEB: val.insert(0, encode_uleb(len(val[-1]))) else: val.insert(0, len(val[-1])) return val def pack(self, byte_order = BYTEORDER_LITTLE): length = 1 if self.type == TYPE_STRING: length = len(self.get_value()[-1]) struct_fmt = byte_order + self.get_struct_fmt(length) packed = struct.pack(struct_fmt, *self.get_value()) if self.prefix is not None: packed = self.prefix + packed return packed def unpack(self, stream, byte_order = BYTEORDER_LITTLE): length = 1 read_pos = 0 extra_read = 0 remove_struct_fmt = [] if self.type == TYPE_STRING: if self.length == TYPE_ULEB: extra_read = 1 read_pos += 1 # Skip 0x0b length, _read_pos = decode_uleb(stream[read_pos:]) read_pos += _read_pos - 1 else: l = Serializable(0, self.length) read_pos = l.unpack(stream) length = l.get_value()[-1] remove_struct_fmt.append(0) struct_fmt = byte_order + self.get_struct_fmt(length, read_pos) length += extra_read if len(remove_struct_fmt): for rem in remove_struct_fmt: struct_fmt = struct_fmt[:rem + 1] + struct_fmt[rem + 2:] data = struct.unpack(struct_fmt, stream[read_pos:read_pos + self.type[1] * length]) if self.filter is not None: data = list(data) data[-1] = self.filter(data[-1], 1) self.value = data[-1] return read_pos + self.type[1] * length class Serializer: def __init__(self, byte_order = BYTEORDER_LITTLE): self.stack = [] self.byte_order = byte_order def add(self, serializable): if type(serializable) is list: self.stack += serializable else: self.stack.append(serializable) def serialize(self): return b"".join(s.pack() for s in self.stack) def deserialize(self, stream): for s in self.stack: stream = stream[s.unpack(stream):] def flush(self): return b"".join(s.pack() for s in self.stack) def __bytes__(self): return self.flush()