import struct import string import lzma import pylzma # Required cause pythons standard library lzma has missing support from .frame import ReplayFrame from .helpers import osuModes from .helpers import osuMods from .helpers import osuRanks from .helpers.typeSerializer import * from io import BufferedReader from os.path import isfile from hashlib import md5 as _md5 import traceback def md5_str(data): if type(data) is str: data = data.encode("ascii") return _md5(data).hexdigest().encode() def md5_file(file): if type(file) is not BufferedReader: raise Exception("File is not a BufferedReader") if file.mode != "rb": raise Exception("File is not in ReadBytes mode") hash = _md5() for chunk in iter(lambda: file.read(4096), b""): hash.update(chunk) return hash.hexdigest() def append_and_compress(data, mode): if mode == 0: return lzma_compress(data + b"-12345|0|0|1337,") else: return lzma_decompress(data)[:-len(b"-12345|0|0|1337,")] def lzma_compress(data): comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False) comp = comp[:5] + struct.pack(b" 3 or mode < 0: raise Exception("Invalid mode") self._mode.value = mode def set_osu_version(self, osu_version): if type(osu_version) is not int: raise Exception("osu! version must be an int") self._osu_version.value = osu_version def set_beatmap_hash(self, md5_hash): if type(md5_hash) is bytes: md5_hash = md5_hash.decode() if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: raise Exception("Invalid beatmap hash") self._beatmap_hash.value = md5_hash.encode() def set_beatmap_file(self, filepath): if not isfile(filepath): raise Exception("Beatmap file not found") with open(filepath, "rb") as f: self._beatmap_hash.value = md5_file(f) def set_player_name(self, player_name): if type(player_name) is str: player_name = player_name.encode() self._player_name.value = player_name def set_score_hash(self, md5_hash): if type(md5_hash) is bytes: md5_hash = md5_hash.decode() if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: raise Exception("Invalid replay hash") self._score_hash.value = md5_hash.encode() def set_score(self, score = None, combo = None, s300 = None, s100 = None, s50 = None, sgekis = None, skatus = None, miss = None): if type(score) is int: self._score.value = score if type(combo) is int: self._combo.value = combo if type(s300) is int: self._score_300s.value = s300 if type(s100) is int: self._score_100s.value = s100 if type(s50) is int: self._score_50s.value = s50 if type(sgekis) is int: self._score_gekis.value = sgekis if type(skatus) is int: self._score_katus.value = skatus if type(miss) is int: self._score_miss.value = miss def set_mods(self, mods): if mods < 0 or mods > (1 << 30) - 1: raise Exception("Mods are out of range") self._mods.value = mods def set_lifebar_graph(self, graph): t_graph = type(graph) if t_graph is list: self._lifebar_graph.value = b"".join(graph) elif t_graph is str: self._lifebar_graph.value = graph.encode() elif t_graph is bytes: self._lifebar_graph.value = graph else: raise Exception("Invalid lifebar data") def set_timestamp(self, timestamp, use_unix = True): if use_unix: timestamp += 62135599380000 # offset timestamp *= 10 ** 4 self._timestamp.value = timestamp def set_online_score_id(self, online_score_id): self._online_score_id.value = online_score_id # -------------------------------------------------------------------------- # Update variables --------------------------------------------------------- def update_perfect(self): m = [self._score_100s.value, self._score_50s.value, self._score_katus.value, self._score_miss.value] self._perfect.value = sum(m) == 0 def update_score_hash(self): self.set_score_hash( md5_str(b"%d%b%b%b%d%b" % ( self._combo.value, b"osu", self._player_name.value, self._beatmap_hash.value, self._score.value, self.get_rank() )) ) def update(self): self.update_perfect() self.update_score_hash() # -------------------------------------------------------------------------- # Get / Helpers ------------------------------------------------------------ def get_hits(self): return sum([self._score_300s.value, self._score_100s.value, self._score_50s.value]) def get_possible_hits(self): return self.get_hits() + self._score_miss.value def get_rank(self): hits = self.get_possible_hits() if hits == 0: print("Can not calculate rank without any score data (Defaulting to Fail)") return osuRanks.F r300 = self._score_300s.value / hits r50 = self._score_50s.value / hits h = osuMods.any_enabled(self._mods.value, osuMods.HIDDEN | osuMods.FLASHLIGHT) if r300 == 1: return osuRanks.SSH if h else osuRanks.SS if r300 > .9 and r50 <= .01 and self._score_miss.value == 0: return osuRanks.SH if h else osuRanks.S if r300 > .8 and self._score_miss.value == 0 or r300 > .9: return osuRanks.A if r300 > .7 and self._score_miss.value == 0 or r300 > .8: return osuRanks.B if r300 > .6: return osuRanks.C return osuRanks.D # -------------------------------------------------------------------------- # Write replay data -------------------------------------------------------- def write(self, frame): t_frame = type(frame) if t_frame is list: for _frame in frame: # (frames) self.write(_frame) return if t_frame is ReplayFrame: self._replay_data.value += bytes(frame) elif t_frame is bytes: self._replay_data.value += frame elif t_frame is str: self._replay_data.value += frame.encode() else: raise Exception("Invalid frame data") # -------------------------------------------------------------------------- # IO replay data ----------------------------------------------------------- def save(self, filename): self.update() serializer = Serializer() attribs = [ a for a in self.__dir__() if not a.startswith("__") ] for attrib in attribs: if attrib == "_end_of_attributes": break serializer.add( self.__getattribute__(attrib) ) # Add value to serializer with open(filename, "wb") as f: f.write( serializer.serialize() ) def load(self, filename): serializer = Serializer() attribs = [ a for a in self.__dir__() if not a.startswith("__") ] for attrib in attribs: if attrib == "_end_of_attributes": break serializer.add( self.__getattribute__(attrib) ) # Add value to serializer with open(filename, "rb") as f: serializer.deserialize( f.read() )