Compare commits

...

3 Commits

Author SHA1 Message Date
1a0c63872c Deserialize (load) 2019-03-19 17:25:05 +01:00
ba738e395d Rewrote serializer type length 2019-03-19 14:10:14 +01:00
12ea8c4d1b Rewrite start 2019-03-18 08:00:18 +01:00
4 changed files with 151 additions and 75 deletions

View File

@ -35,6 +35,6 @@ class ReplayFrame:
self._float_convert(self.x), self._float_convert(self.x),
self._float_convert(self.y), self._float_convert(self.y),
self.buttons) self.buttons)
def __bytes__(self): def __bytes__(self):
return str(self).encode() return str(self).encode()

View File

@ -1,20 +1,21 @@
import struct import struct
TYPE_CHAR = b"b" TYPE_CHAR = b"b", 1
TYPE_BYTE = b"B" TYPE_BYTE = b"B", 1
TYPE_BOOL = b"?" TYPE_BOOL = b"?", 1
TYPE_SHORT = b"h" TYPE_SHORT = b"h", 2
TYPE_USHORT = b"H" TYPE_USHORT = b"H", 2
TYPE_INT = b"i" TYPE_INT = b"i", 4
TYPE_UINT = b"I" TYPE_UINT = b"I", 4
TYPE_LONG = b"l" TYPE_LONG = b"l", 4
TYPE_ULONG = b"L" TYPE_ULONG = b"L", 4
TYPE_LLONG = b"q" TYPE_LLONG = b"q", 8
TYPE_ULLONG = b"Q" TYPE_ULLONG = b"Q", 8
TYPE_FLOAT = b"f" TYPE_FLOAT = b"f", 4
TYPE_DOUBLE = b"d" TYPE_DOUBLE = b"d", 8
TYPE_STRING = b"B%ds" TYPE_STRING = b"s", 1
TYPE_BYTESTREAM = b"%ds"
TYPE_ULEB = b"s", 1, True # True just so they are different tuples
BYTEORDER_NATIVE = b"@" BYTEORDER_NATIVE = b"@"
BYTEORDER_LITTLE = b"<" BYTEORDER_LITTLE = b"<"
@ -22,7 +23,6 @@ BYTEORDER_BIG = b">"
def encode_uleb(val): def encode_uleb(val):
data = b"" data = b""
pos = 0
while val != 0: while val != 0:
b = val & 0x7f b = val & 0x7f
val >>= 7 val >>= 7
@ -73,10 +73,18 @@ def guess_type(value):
raise Exception("Unable to guess type") raise Exception("Unable to guess type")
class Serializable: def read(value_type, stream):
post_serialized = None #if value_type == TYPE_STRING:
# read()
def __init__(self, value, type = None, post = None): struct.unpack(value_type[0], stream[:value_type[1]])
class Serializable:
length = None
prefix = b""
filter = None
def __init__(self, value, type = None, length = None, prefix = b"", filter = None):
self.value = value self.value = value
if type is not None: if type is not None:
@ -84,29 +92,76 @@ class Serializable:
else: else:
self.type = guess_type(type) self.type = guess_type(type)
self.post_serialized = post self.length = length
self.prefix = prefix
self.filter = filter
def get_struct_fmt(self): def get_struct_fmt(self, length = 1, uleb_length_override = -1):
if self.type in [TYPE_STRING, TYPE_BYTESTREAM]: struct_fmt = self.type[0]
return self.type % len(self.get_value()[-1]) if length > 1:
return self.type struct_fmt = b"%d%b" % (length, self.type[0])
if self.length is not None:
if self.length == TYPE_ULEB:
struct_fmt = b"%d%b%b" % (len(self.get_value()[0]) if uleb_length_override == -1 else uleb_length_override, self.length[0], struct_fmt)
else:
struct_fmt = b"%b%b" % (self.length[0], struct_fmt)
return struct_fmt
def get_value(self): def get_value(self):
def _get_value(): val = [self.value]
if self.type == TYPE_STRING: if self.filter is not None:
return [0x0b, encode_uleb(len(self.value)) + self.value] val[-1] = self.filter(val[-1], 0)
return [self.value] if self.length is not None:
if self.length == TYPE_ULEB:
val = _get_value() val.insert(0, encode_uleb(len(val[-1])))
if self.post_serialized is not None: else:
val[-1] = self.post_serialized(val[-1]) val.insert(0, len(val[-1]))
return val return val
def pack(self, byte_order = BYTEORDER_LITTLE): def pack(self, byte_order = BYTEORDER_LITTLE):
struct_fmt = byte_order + self.get_struct_fmt() length = 1
#print(struct_fmt, self.get_value()) if self.type == TYPE_STRING:
return struct.pack(struct_fmt, *self.get_value()) length = len(self.get_value()[-1])
struct_fmt = byte_order + self.get_struct_fmt(length)
packed = struct.pack(struct_fmt, *self.get_value())
if self.prefix is not None:
packed = self.prefix + packed
return packed
def unpack(self, stream, byte_order = BYTEORDER_LITTLE):
length = 1
read_pos = 0
extra_read = 0
remove_struct_fmt = []
if self.type == TYPE_STRING:
if self.length == TYPE_ULEB:
extra_read = 1
read_pos += 1 # Skip 0x0b
length, _read_pos = decode_uleb(stream[read_pos:])
read_pos += _read_pos - 1
else:
l = Serializable(0, self.length)
read_pos = l.unpack(stream)
length = l.get_value()[-1]
remove_struct_fmt.append(0)
struct_fmt = byte_order + self.get_struct_fmt(length, read_pos)
length += extra_read
if len(remove_struct_fmt):
for rem in remove_struct_fmt:
struct_fmt = struct_fmt[:rem + 1] + struct_fmt[rem + 2:]
data = struct.unpack(struct_fmt, stream[read_pos:read_pos + self.type[1] * length])
if self.filter is not None:
data = list(data)
data[-1] = self.filter(data[-1], 1)
self.value = data[-1]
return read_pos + self.type[1] * length
class Serializer: class Serializer:
def __init__(self, byte_order = BYTEORDER_LITTLE): def __init__(self, byte_order = BYTEORDER_LITTLE):
@ -119,6 +174,13 @@ class Serializer:
else: else:
self.stack.append(serializable) self.stack.append(serializable)
def serialize(self):
return b"".join(s.pack() for s in self.stack)
def deserialize(self, stream):
for s in self.stack:
stream = stream[s.unpack(stream):]
def flush(self): def flush(self):
return b"".join(s.pack() for s in self.stack) return b"".join(s.pack() for s in self.stack)

View File

@ -1,6 +1,6 @@
import struct import struct
import string import string
#import lzma import lzma
import pylzma # Required cause pythons standard library lzma has missing support import pylzma # Required cause pythons standard library lzma has missing support
from .frame import ReplayFrame from .frame import ReplayFrame
@ -18,10 +18,8 @@ from hashlib import md5 as _md5
import traceback import traceback
def md5_str(data): def md5_str(data):
print(data)
if type(data) is str: if type(data) is str:
data = data.encode("ascii") data = data.encode("ascii")
print(_md5(data).hexdigest().encode())
return _md5(data).hexdigest().encode() return _md5(data).hexdigest().encode()
def md5_file(file): def md5_file(file):
@ -36,39 +34,51 @@ def md5_file(file):
hash.update(chunk) hash.update(chunk)
return hash.hexdigest() return hash.hexdigest()
def append_and_compress(data): def append_and_compress(data, mode):
return lzma_compress(data + b"-12345|0|0|1337,") if mode == 0:
return lzma_compress(data + b"-12345|0|0|1337,")
else:
return lzma_decompress(data)[:-len(b"-12345|0|0|1337,")]
def lzma_compress(data): def lzma_compress(data):
comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False) comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False)
comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length
return struct.pack("<I", len(comp)) + comp # For some odd ass reason this doesnt use ULEB return comp
def lzma_decompress(data):
comp = lzma.decompress(data)
return comp
class Replay: class Replay:
_mode = Serializable(osuModes.STANDARD, TYPE_BYTE) _mode = Serializable(osuModes.STANDARD, TYPE_BYTE)
_osu_version = Serializable(20181216, TYPE_INT) _osu_version = Serializable(20181216, TYPE_INT)
_beatmap_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING) #_beatmap_hash_length = Serializable(32, TYPE_ULEB)
_player_name = Serializable(b"osu!", TYPE_STRING) _beatmap_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
_score_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING) #_player_name_length = Serializable(4, TYPE_ULEB)
_player_name = Serializable(b"osu!", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
#_score_hash_length = Serializable(32, TYPE_ULEB)
_score_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
_score_300s = Serializable(0, TYPE_USHORT) _score_300s = Serializable(0, TYPE_USHORT)
_score_100s = Serializable(0, TYPE_USHORT) _score_100s = Serializable(0, TYPE_USHORT)
_score_50s = Serializable(0, TYPE_USHORT) _score_50s = Serializable(0, TYPE_USHORT)
_score_gekis = Serializable(0, TYPE_USHORT) _score_gekis = Serializable(0, TYPE_USHORT)
_score_katus = Serializable(0, TYPE_USHORT) _score_katus = Serializable(0, TYPE_USHORT)
_score_miss = Serializable(0, TYPE_USHORT) _score_miss = Serializable(0, TYPE_USHORT)
_score = Serializable(0, TYPE_INT) _score = Serializable(0, TYPE_INT)
_combo = Serializable(0, TYPE_USHORT) _combo = Serializable(0, TYPE_USHORT)
_perfect = Serializable(True, TYPE_BOOL) _perfect = Serializable(True, TYPE_BOOL)
_mods = Serializable(osuMods.NOMOD, TYPE_INT) _mods = Serializable(osuMods.NOMOD, TYPE_INT)
_lifebar_graph = Serializable(b"0|1,", TYPE_STRING) #_lifebar_graph_length = Serializable(4, TYPE_ULEB)
_timestamp = Serializable(0, TYPE_ULLONG) _lifebar_graph = Serializable(b"0|1,", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
_timestamp = Serializable(0, TYPE_ULLONG)
_replay_data = Serializable(b"", TYPE_BYTESTREAM, post = append_and_compress) #_replay_data_length = Serializable(0, TYPE_UINT)
_replay_data = Serializable(b"", TYPE_STRING, length = TYPE_UINT, filter = append_and_compress)
_online_score_id= Serializable(0, TYPE_ULLONG) _online_score_id = Serializable(0, TYPE_ULLONG)
# Marker (Only used as attribute section marker for auto-serializing) # Marker (Only used as attribute section marker for auto-serializing)
_end_of_attributes = None _end_of_attributes = None
@ -245,4 +255,17 @@ class Replay:
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
with open(filename, "wb") as f: with open(filename, "wb") as f:
f.write( serializer.flush() ) f.write( serializer.serialize() )
def load(self, filename):
serializer = Serializer()
attribs = [ a for a in self.__dir__() if not a.startswith("__") ]
for attrib in attribs:
if attrib == "_end_of_attributes":
break
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
with open(filename, "rb") as f:
serializer.deserialize( f.read() )

15
test.py
View File

@ -3,17 +3,8 @@ from osuRepy.replay import Replay # Replay instance
from osuRepy.helpers import osuButtons, osuMods # Enum helpers from osuRepy.helpers import osuButtons, osuMods # Enum helpers
replay = Replay(beatmap_hash = "9e66c0a0eadced7d07f06b3968a74cc0") # Create replay instance replay = Replay() # Create replay instance
replay.write( ReplayFrame(2, 256, 192, osuButtons.M1) ) # Add replay frame (time, x, y, buttons) replay.load("replay.osr")
replay.set_score(score = 29284624, replay.save("test.osr")
s300 = 416,
s50 = 2,
miss = 1,
combo = 358) # Set score info
replay.set_mods(osuMods.HIDDEN | osuMods.DOUBLETIME) # Enable HDDT mods
replay.set_timestamp(636880715111611126, False)
replay.save("myReplay.osr") # Export replay to file