import struct import string import lzma from .helpers import osuModes from .helpers import osuMods from .helpers import osuRanks from .helpers import typeSerializer from io import BufferedReader from os.path import isfile from hashlib import md5 as _md5 def md5_str(data): if type(data) is str: data = data.encode("ascii") return _md5(data).hexdigest().encode() def md5_file(file): if type(file) is not BufferedReader: raise Exception("File is not a BufferedReader") if file.mode != "rb": raise Exception("File is not in ReadBytes mode") hash = _md5() for chunk in iter(lambda: file.read(4096), b""): hash.update(chunk) return hash.hexdigest() class Replay: mode = osuModes.STANDARD # Byte osu_version = 20131216 # Int beatmap_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[] player_name = b"osu!" # Byte[] score_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[] score_300s = 0 # uShort score_100s = 0 # uShort score_50s = 0 # uShort score_gekis = 0 # uShort score_katus = 0 # uShort score_miss = 0 # uShort score = 0 # Int combo = 0 # uShort perfect = True # Byte mods = osuMods.NOMOD # Int lifebar_graph = b"0|1," # Byte[] timestamp = 0 # Long replay_data = b"" # Byte[] online_score_id = 0 # Long def __init__(self, **kwargs): allowed_kwargs = { "mode": self.set_mode, "mods": self.set_mods, "osu_version": self.set_osu_version, "player_name": self.set_player_name, "replay_data": self.write } for k, v in kwargs.items(): if k in allowed_kwargs.keys(): allowed_kwargs[k](v) # Run callback func # Set validators ----------------------------------------------------------- def set_mode(self, mode_id): if mode_id > 3 or mode_id < 0: raise Exception("Invalid mode") self.mode_id = mode_id def set_osu_version(self, osu_version): if type(osu_version) is not int: raise Exception("osu! version must be an int") self.osu_version = osu_version def set_beatmap_hash(self, md5_hash): if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: raise Exception("Invalid beatmap hash") self.beatmap_hash = md5_hash def set_beatmap(self, filepath): if not isfile(filepath): raise Exception("Beatmap file not found") with open(filepath, "rb") as f: self.beatmap_hash = md5_file(f) def set_player_name(self, player_name): self.player_name = player_name def set_score_hash(self, md5_hash): if type(md5_hash) is bytes: md5_hash = md5_hash.decode() if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: raise Exception("Invalid replay hash") self.score_hash = md5_hash.encode() def set_mods(self, mods): if mods < 0 or mods > (1 << 30) - 1: raise Exception("Mods are out of range") self.mods = mods # -------------------------------------------------------------------------- # Update variables --------------------------------------------------------- def update_perfect(self): m = [self.score_100s, self.score_50s, self.score_katus, self.score_miss] self.perfect = sum(m) == 0 def update_score_hash(self): self.set_score_hash( md5_str("%d%s%s%s%d%d" % ( self.combo, "osu", self.player_name, self.beatmap_hash, self.score, self.get_rank() )) ) def update(self): self.update_perfect() self.update_score_hash() # -------------------------------------------------------------------------- # Get / Helpers ------------------------------------------------------------ def get_hits(self): return sum([self.score_300s, self.score_100s, self.score_50s]) def get_possible_hits(self): return self.get_hits() + self.score_miss def get_rank(self): # We dont give out F ranks around here hits = self.get_possible_hits() if hits == 0: raise Exception("Can not calculate rank without any score data") r300 = self.score_300s / hits r50 = self.score_50s / hits h = osuMods.any_enabled(self.mods, osuMods.HIDDEN | osuMods.FLASHLIGHT) if r300 == 1: return osuRanks.SSH if h else osuRanks.SS if r300 > .9 and r50 <= .01 and self.score_miss == 0: return osuRanks.SH if h else osuRanks.S if r300 > .8 and self.score_miss == 0 or r300 > .9: return osuRanks.A if r300 > .7 and self.score_miss == 0 or r300 > .8: return osuRanks.B if r300 > .6: return osuRanks.C return osuRanks.D # -------------------------------------------------------------------------- # Write replay data -------------------------------------------------------- def write(self, frame): if type(frame) is list: for _frame in frame: # (frames) self.write(_frame) return self.replay_data += bytes(frame) # -------------------------------------------------------------------------- # IO replay data ----------------------------------------------------------- def save(self, filename): self.update() compressed_replay = lzma.compress(self.replay_data + b"-12345|0|0|1337,", format = lzma.FORMAT_ALONE, filters = [ { "id": lzma.FILTER_LZMA1, "preset": lzma.PRESET_DEFAULT, "dict_size": 1 << 21, # !important } ]) _player_name = bytes([len(self.player_name)]) + self.player_name _lifebar_graph = bytes([len(self.lifebar_graph)]) + self.lifebar_graph _compressed_replay = bytes([len(compressed_replay)]) + compressed_replay with open(filename, "wb") as f: # Yes... Please never do this data = struct.pack(b"