181 lines
5.3 KiB
Python
181 lines
5.3 KiB
Python
import struct
|
|
import string
|
|
import lzma
|
|
|
|
from .helpers import osuModes
|
|
from .helpers import osuMods
|
|
from .helpers import osuRanks
|
|
from .helpers import typeSerializer
|
|
|
|
from hashlib import md5 as _md5
|
|
|
|
def md5(str):
|
|
return _md5(str.encode("ascii")).hexdigest().encode()
|
|
|
|
class Replay:
|
|
mode = osuModes.STANDARD # Byte
|
|
osu_version = 20131216 # Int
|
|
beatmap_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[]
|
|
player_name = b"osu!" # Byte[]
|
|
score_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[]
|
|
|
|
score_300s = 0 # uShort
|
|
score_100s = 0 # uShort
|
|
score_50s = 0 # uShort
|
|
score_gekis = 0 # uShort
|
|
score_katus = 0 # uShort
|
|
score_miss = 0 # uShort
|
|
score = 0 # Int
|
|
combo = 0 # uShort
|
|
perfect = True # Byte
|
|
|
|
mods = osuMods.NOMOD # Int
|
|
|
|
lifebar_graph = b"0|1," # Byte[]
|
|
timestamp = 0 # Long
|
|
|
|
replay_data = b"" # Byte[]
|
|
|
|
online_score_id = 0 # Long
|
|
|
|
def __init__(self, **kwargs):
|
|
allowed_kwargs = {
|
|
"mode": self.set_mode,
|
|
"mods": self.set_mods,
|
|
"osu_version": self.set_osu_version,
|
|
"player_name": self.set_player_name,
|
|
"replay_data": self.write
|
|
}
|
|
for k, v in kwargs.items():
|
|
if k in allowed_kwargs.keys():
|
|
allowed_kwargs[k](v) # Run callback func
|
|
|
|
# Set validators -----------------------------------------------------------
|
|
|
|
def set_mode(self, mode_id):
|
|
if mode_id > 3 or mode_id < 0:
|
|
raise Exception("Invalid mode")
|
|
self.mode_id = mode_id
|
|
|
|
def set_osu_version(self, osu_version):
|
|
if type(osu_version) is not int:
|
|
raise Exception("osu! version must be an int")
|
|
self.osu_version = osu_version
|
|
|
|
def set_beatmap_hash(self, md5_hash):
|
|
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
|
raise Exception("Invalid beatmap hash")
|
|
self.beatmap_hash = md5_hash
|
|
|
|
def set_player_name(self, player_name):
|
|
self.player_name = player_name
|
|
|
|
def set_score_hash(self, md5_hash):
|
|
if type(md5_hash) is bytes:
|
|
md5_hash = md5_hash.decode()
|
|
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
|
raise Exception("Invalid replay hash")
|
|
self.score_hash = md5_hash.encode()
|
|
|
|
def set_mods(self, mods):
|
|
if mods < 0 or mods > (1 << 30) - 1:
|
|
raise Exception("Mods are out of range")
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Update variables ---------------------------------------------------------
|
|
|
|
def update_perfect(self):
|
|
m = [self.score_100s, self.score_50s, self.score_katus, self.score_miss]
|
|
self.perfect = sum(m) == 0
|
|
|
|
def update_score_hash(self):
|
|
self.set_score_hash(
|
|
md5("%d%s%s%s%d%d" % (
|
|
self.combo, "osu", self.player_name,
|
|
self.beatmap_hash, self.score, self.get_rank()
|
|
))
|
|
)
|
|
|
|
def update(self):
|
|
self.update_perfect()
|
|
self.update_score_hash()
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Get / Helpers ------------------------------------------------------------
|
|
|
|
def get_hits(self):
|
|
return sum([self.score_300s, self.score_100s, self.score_50s])
|
|
|
|
def get_possible_hits(self):
|
|
return self.get_hits() + self.score_miss
|
|
|
|
def get_rank(self): # We dont give out F ranks around here
|
|
hits = self.get_possible_hits()
|
|
if hits == 0:
|
|
raise Exception("Can not calculate rank without any score data")
|
|
|
|
r300 = self.score_300s / hits
|
|
r50 = self.score_50s / hits
|
|
h = osuMods.any_enabled(self.mods, osuMods.HIDDEN | osuMods.FLASHLIGHT)
|
|
|
|
if r300 == 1:
|
|
return osuRanks.SSH if h else osuRanks.SS
|
|
if r300 > .9 and r50 <= .01 and self.score_miss == 0:
|
|
return osuRanks.SH if h else osuRanks.S
|
|
if r300 > .8 and self.score_miss == 0 or r300 > .9:
|
|
return osuRanks.A
|
|
if r300 > .7 and self.score_miss == 0 or r300 > .8:
|
|
return osuRanks.B
|
|
if r300 > .6:
|
|
return osuRanks.C
|
|
return osuRanks.D
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Write replay data --------------------------------------------------------
|
|
|
|
def write(self, frame):
|
|
if type(frame) is list:
|
|
for _frame in frame: # (frames)
|
|
self.write(_frame)
|
|
return
|
|
|
|
self.replay_data += bytes(frame)
|
|
|
|
# --------------------------------------------------------------------------
|
|
# IO replay data -----------------------------------------------------------
|
|
|
|
def save(self, filename):
|
|
self.update()
|
|
|
|
compressed_replay = lzma.compress(self.replay_data + b"-12345|0|0|1337,", format = lzma.FORMAT_ALONE, filters = [
|
|
{
|
|
"id": lzma.FILTER_LZMA1,
|
|
"preset": lzma.PRESET_DEFAULT,
|
|
"dict_size": 1 << 21, # !important
|
|
}
|
|
])
|
|
|
|
_player_name = bytes([len(self.player_name)]) + self.player_name
|
|
_lifebar_graph = bytes([len(self.lifebar_graph)]) + self.lifebar_graph
|
|
_compressed_replay = bytes([len(compressed_replay)]) + compressed_replay
|
|
|
|
with open(filename, "wb") as f:
|
|
# Yes... Please never do this
|
|
data = struct.pack(b"<BiBB32sB%dsBB32s6HiH?iB%dsQ%dsQ" % (len(_player_name), len(_lifebar_graph), len(_compressed_replay)),
|
|
self.mode,
|
|
self.osu_version,
|
|
11, len(self.beatmap_hash), self.beatmap_hash,
|
|
11, _player_name,
|
|
11, len(self.score_hash), self.score_hash,
|
|
self.score_300s, self.score_100s, self.score_50s, self.score_gekis, self.score_katus, self.score_miss,
|
|
self.score,
|
|
self.combo,
|
|
self.perfect,
|
|
self.mods,
|
|
11, _lifebar_graph,
|
|
self.timestamp,
|
|
_compressed_replay,
|
|
self.online_score_id
|
|
)
|
|
f.write(data)
|