From 677ece2cb9fd7b6b3ab0c1afc714740485d38ce3 Mon Sep 17 00:00:00 2001 From: Sunpy Date: Wed, 13 Mar 2019 09:07:46 +0100 Subject: [PATCH] type rewrite --- osuRepy/frame.py | 2 +- osuRepy/helpers/__init__.py | 2 + osuRepy/helpers/typeSerializer.py | 109 ++++++++++++++++++++++++ osuRepy/replay.py | 133 ++++++++++++++---------------- test.py | 4 +- 5 files changed, 178 insertions(+), 72 deletions(-) create mode 100644 osuRepy/helpers/typeSerializer.py diff --git a/osuRepy/frame.py b/osuRepy/frame.py index ae7dabd..5b49b34 100644 --- a/osuRepy/frame.py +++ b/osuRepy/frame.py @@ -27,4 +27,4 @@ class ReplayFrame: return "%s|%s|%s|%s," % (self.delta, self.x, self.y, self.buttons) def __bytes__(self): - return str(self).encode() \ No newline at end of file + return str(self).encode() diff --git a/osuRepy/helpers/__init__.py b/osuRepy/helpers/__init__.py index 8d94b5a..d59e963 100644 --- a/osuRepy/helpers/__init__.py +++ b/osuRepy/helpers/__init__.py @@ -2,3 +2,5 @@ from . import osuModes from . import osuMods from . import osuRanks from . import osuButtons + +from . import typeSerializer \ No newline at end of file diff --git a/osuRepy/helpers/typeSerializer.py b/osuRepy/helpers/typeSerializer.py new file mode 100644 index 0000000..d0c13af --- /dev/null +++ b/osuRepy/helpers/typeSerializer.py @@ -0,0 +1,109 @@ +import struct + +TYPE_CHAR = b"b" +TYPE_BYTE = b"B" +TYPE_BOOL = b"?" +TYPE_SHORT = b"h" +TYPE_USHORT = b"H" +TYPE_INT = b"i" +TYPE_UINT = b"I" +TYPE_LONG = b"l" +TYPE_ULONG = b"L" +TYPE_LLONG = b"q" +TYPE_ULLONG = b"Q" +TYPE_FLOAT = b"f" +TYPE_DOUBLE = b"d" +TYPE_STRING = b"B%ds" +TYPE_BYTESTREAM = b"%ds" + +BYTEORDER_NATIVE = b"@" +BYTEORDER_LITTLE = b"<" +BYTEORDER_BIG = b">" + +def guess_type(value): + t_value = type(value) + + if t_value is bool: + return TYPE_BOOL + if t_value is float: + return TYPE_FLOAT + if t_value is str: + return TYPE_STRING + if t_value is bytes: + return TYPE_STRING + if t_value is int: + bit_len = 0 + while value: + value >>= 1 + bit_len += 1 + + byte_len = (bit_len + 1) // 2 + if byte_len <= 1: + return TYPE_BYTE + if byte_len <= 2: + return TYPE_USHORT + if byte_len <= 4: + return TYPE_UINT + if byte_len <= 8: + return TYPE_ULLONG + + raise Exception("Unable to guess type") + +class Serializable: + post_serialized = None + + def __init__(self, value, type = None, post = None): + self.value = value + + if type is not None: + self.type = type + else: + self.type = guess_type(type) + + self.post_serialized = post + + def get_struct_fmt(self): + if self.type in [TYPE_STRING, TYPE_BYTESTREAM]: + return self.type % len(self.get_value()[-1]) + return self.type + + def get_value(self): + def _get_value(): + if self.type == TYPE_STRING: + return [11, bytes([len(self.value)]) + self.value] + return [self.value] + + val = _get_value() + if self.post_serialized is not None: + val[-1] = self.post_serialized(val[-1]) + + return val + + def pack(self, byte_order = BYTEORDER_LITTLE): + struct_fmt = byte_order + self.get_struct_fmt() + print(struct_fmt, self.get_value()) + return struct.pack(struct_fmt, *self.get_value()) + +class Serializer: + def __init__(self, byte_order = BYTEORDER_LITTLE): + self.stack = [] + self.byte_order = byte_order + + def add(self, serializable): + if type(serializable) is list: + self.stack += serializable + else: + self.stack.append(serializable) + + def flush(self): + return b"".join(s.pack() for s in self.stack) + """ + struct_fmt = self.byte_order + b"".join( s.get_struct_fmt() for s in self.stack ) + + print(struct_fmt) + print([ s.get_value() for s in self.stack ]) + return struct.pack(struct_fmt, *[ s.get_value() for s in self.stack ]) + """ + + def __bytes__(self): + return self.flush() diff --git a/osuRepy/replay.py b/osuRepy/replay.py index db733c1..d1ad5da 100644 --- a/osuRepy/replay.py +++ b/osuRepy/replay.py @@ -6,6 +6,8 @@ from .helpers import osuModes from .helpers import osuMods from .helpers import osuRanks +from .helpers.typeSerializer import * + from io import BufferedReader from os.path import isfile from hashlib import md5 as _md5 @@ -27,31 +29,43 @@ def md5_file(file): hash.update(chunk) return hash.hexdigest() +def lzma_compress(data): + return bytes([len(data)]) + lzma.compress(data + b"-12345|0|0|1337,", format = lzma.FORMAT_ALONE, filters = [ + { + "id": lzma.FILTER_LZMA1, + "preset": lzma.PRESET_DEFAULT, + "dict_size": 1 << 21 + } + ]) + class Replay: - mode = osuModes.STANDARD # Byte - osu_version = 20131216 # Int - beatmap_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[] - player_name = b"osu!" # Byte[] - score_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[] + mode = Serializable(osuModes.STANDARD, TYPE_BYTE) + osu_version = Serializable(20131216, TYPE_INT) + beatmap_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING) + player_name = Serializable(b"osu!", TYPE_STRING) + score_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING) - score_300s = 0 # uShort - score_100s = 0 # uShort - score_50s = 0 # uShort - score_gekis = 0 # uShort - score_katus = 0 # uShort - score_miss = 0 # uShort - score = 0 # Int - combo = 0 # uShort - perfect = True # Byte + score_300s = Serializable(0, TYPE_USHORT) + score_100s = Serializable(0, TYPE_USHORT) + score_50s = Serializable(0, TYPE_USHORT) + score_gekis = Serializable(0, TYPE_USHORT) + score_katus = Serializable(0, TYPE_USHORT) + score_miss = Serializable(0, TYPE_USHORT) + score = Serializable(0, TYPE_INT) + combo = Serializable(0, TYPE_USHORT) + perfect = Serializable(True, TYPE_BOOL) - mods = osuMods.NOMOD # Int + mods = Serializable(osuMods.NOMOD, TYPE_INT) - lifebar_graph = b"0|1," # Byte[] - timestamp = 0 # Long + lifebar_graph = Serializable(b"0|1,", TYPE_STRING) + timestamp = Serializable(0, TYPE_ULLONG) - replay_data = b"" # Byte[] + replay_data = Serializable(b"", TYPE_BYTESTREAM, post = lzma_compress) - online_score_id = 0 # Long + online_score_id = Serializable(0, TYPE_ULLONG) + + # Marker (Only used as attribute section marker for auto-serializing) + end_of_attributes = None def __init__(self, **kwargs): allowed_kwargs = { @@ -67,54 +81,54 @@ class Replay: # Set validators ----------------------------------------------------------- - def set_mode(self, mode_id): - if mode_id > 3 or mode_id < 0: + def set_mode(self, mode): + if mode > 3 or mode < 0: raise Exception("Invalid mode") - self.mode_id = mode_id + self.mode.value = mode def set_osu_version(self, osu_version): if type(osu_version) is not int: raise Exception("osu! version must be an int") - self.osu_version = osu_version + self.osu_version.value = osu_version def set_beatmap_hash(self, md5_hash): if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: raise Exception("Invalid beatmap hash") - self.beatmap_hash = md5_hash + self.beatmap_hash.value = md5_hash def set_beatmap(self, filepath): if not isfile(filepath): raise Exception("Beatmap file not found") with open(filepath, "rb") as f: - self.beatmap_hash = md5_file(f) + self.beatmap_hash.value = md5_file(f) def set_player_name(self, player_name): - self.player_name = player_name + self.player_name.value = player_name def set_score_hash(self, md5_hash): if type(md5_hash) is bytes: md5_hash = md5_hash.decode() if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: raise Exception("Invalid replay hash") - self.score_hash = md5_hash.encode() + self.score_hash.value = md5_hash.encode() def set_mods(self, mods): if mods < 0 or mods > (1 << 30) - 1: raise Exception("Mods are out of range") - self.mods = mods + self.mods.value = mods # -------------------------------------------------------------------------- # Update variables --------------------------------------------------------- def update_perfect(self): - m = [self.score_100s, self.score_50s, self.score_katus, self.score_miss] - self.perfect = sum(m) == 0 + m = [self.score_100s.value, self.score_50s.value, self.score_katus.value, self.score_miss.value] + self.perfect.value = sum(m) == 0 def update_score_hash(self): self.set_score_hash( md5_str("%d%s%s%s%d%d" % ( - self.combo, "osu", self.player_name, - self.beatmap_hash, self.score, self.get_rank() + self.combo.value, "osu", self.player_name.value, + self.beatmap_hash.value, self.score.value, self.get_rank() )) ) @@ -126,27 +140,27 @@ class Replay: # Get / Helpers ------------------------------------------------------------ def get_hits(self): - return sum([self.score_300s, self.score_100s, self.score_50s]) + return sum([self.score_300s.value, self.score_100s.value, self.score_50s.value]) def get_possible_hits(self): - return self.get_hits() + self.score_miss + return self.get_hits() + self.score_miss.value def get_rank(self): # We dont give out F ranks around here hits = self.get_possible_hits() if hits == 0: raise Exception("Can not calculate rank without any score data") - r300 = self.score_300s / hits - r50 = self.score_50s / hits - h = osuMods.any_enabled(self.mods, osuMods.HIDDEN | osuMods.FLASHLIGHT) + r300 = self.score_300s.value / hits + r50 = self.score_50s.value / hits + h = osuMods.any_enabled(self.mods.value, osuMods.HIDDEN | osuMods.FLASHLIGHT) if r300 == 1: return osuRanks.SSH if h else osuRanks.SS - if r300 > .9 and r50 <= .01 and self.score_miss == 0: + if r300 > .9 and r50 <= .01 and self.score_miss.value == 0: return osuRanks.SH if h else osuRanks.S - if r300 > .8 and self.score_miss == 0 or r300 > .9: + if r300 > .8 and self.score_miss.value == 0 or r300 > .9: return osuRanks.A - if r300 > .7 and self.score_miss == 0 or r300 > .8: + if r300 > .7 and self.score_miss.value == 0 or r300 > .8: return osuRanks.B if r300 > .6: return osuRanks.C @@ -161,7 +175,7 @@ class Replay: self.write(_frame) return - self.replay_data += bytes(frame) + self.replay_data.value += bytes(frame) # -------------------------------------------------------------------------- # IO replay data ----------------------------------------------------------- @@ -169,34 +183,15 @@ class Replay: def save(self, filename): self.update() - compressed_replay = lzma.compress(self.replay_data + b"-12345|0|0|1337,", format = lzma.FORMAT_ALONE, filters = [ - { - "id": lzma.FILTER_LZMA1, - "preset": lzma.PRESET_DEFAULT, - "dict_size": 1 << 21, # !important - } - ]) + serializer = Serializer() - _player_name = bytes([len(self.player_name)]) + self.player_name - _lifebar_graph = bytes([len(self.lifebar_graph)]) + self.lifebar_graph - _compressed_replay = bytes([len(compressed_replay)]) + compressed_replay + attribs = [ a for a in self.__dir__() if not a.startswith("__") ] + for attrib in attribs: + if attrib == "end_of_attributes": + break + + serializer.add( self.__getattribute__(attrib) ) # Add value to serializer with open(filename, "wb") as f: - # Yes... Please never do this - data = struct.pack(b"