osu-repy/osuRepy/replay.py
2019-03-13 09:08:44 +01:00

197 lines
6.0 KiB
Python

import struct
import string
import lzma
from .helpers import osuModes
from .helpers import osuMods
from .helpers import osuRanks
from .helpers.typeSerializer import *
from io import BufferedReader
from os.path import isfile
from hashlib import md5 as _md5
def md5_str(data):
if type(data) is str:
data = data.encode("ascii")
return _md5(data).hexdigest().encode()
def md5_file(file):
if type(file) is not BufferedReader:
raise Exception("File is not a BufferedReader")
if file.mode != "rb":
raise Exception("File is not in ReadBytes mode")
hash = _md5()
for chunk in iter(lambda: file.read(4096), b""):
hash.update(chunk)
return hash.hexdigest()
def lzma_compress(data):
return bytes([len(data)]) + lzma.compress(data + b"-12345|0|0|1337,", format = lzma.FORMAT_ALONE, filters = [
{
"id": lzma.FILTER_LZMA1,
"preset": lzma.PRESET_DEFAULT,
"dict_size": 1 << 21
}
])
class Replay:
mode = Serializable(osuModes.STANDARD, TYPE_BYTE)
osu_version = Serializable(20131216, TYPE_INT)
beatmap_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)
player_name = Serializable(b"osu!", TYPE_STRING)
score_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)
score_300s = Serializable(0, TYPE_USHORT)
score_100s = Serializable(0, TYPE_USHORT)
score_50s = Serializable(0, TYPE_USHORT)
score_gekis = Serializable(0, TYPE_USHORT)
score_katus = Serializable(0, TYPE_USHORT)
score_miss = Serializable(0, TYPE_USHORT)
score = Serializable(0, TYPE_INT)
combo = Serializable(0, TYPE_USHORT)
perfect = Serializable(True, TYPE_BOOL)
mods = Serializable(osuMods.NOMOD, TYPE_INT)
lifebar_graph = Serializable(b"0|1,", TYPE_STRING)
timestamp = Serializable(0, TYPE_ULLONG)
replay_data = Serializable(b"", TYPE_BYTESTREAM, post = lzma_compress)
online_score_id = Serializable(0, TYPE_ULLONG)
# Marker (Only used as attribute section marker for auto-serializing)
end_of_attributes = None
def __init__(self, **kwargs):
allowed_kwargs = {
"mode": self.set_mode,
"mods": self.set_mods,
"osu_version": self.set_osu_version,
"player_name": self.set_player_name,
"replay_data": self.write
}
for k, v in kwargs.items():
if k in allowed_kwargs.keys():
allowed_kwargs[k](v) # Run callback func
# Set validators -----------------------------------------------------------
def set_mode(self, mode):
if mode > 3 or mode < 0:
raise Exception("Invalid mode")
self.mode.value = mode
def set_osu_version(self, osu_version):
if type(osu_version) is not int:
raise Exception("osu! version must be an int")
self.osu_version.value = osu_version
def set_beatmap_hash(self, md5_hash):
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid beatmap hash")
self.beatmap_hash.value = md5_hash
def set_beatmap(self, filepath):
if not isfile(filepath):
raise Exception("Beatmap file not found")
with open(filepath, "rb") as f:
self.beatmap_hash.value = md5_file(f)
def set_player_name(self, player_name):
self.player_name.value = player_name
def set_score_hash(self, md5_hash):
if type(md5_hash) is bytes:
md5_hash = md5_hash.decode()
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid replay hash")
self.score_hash.value = md5_hash.encode()
def set_mods(self, mods):
if mods < 0 or mods > (1 << 30) - 1:
raise Exception("Mods are out of range")
self.mods.value = mods
# --------------------------------------------------------------------------
# Update variables ---------------------------------------------------------
def update_perfect(self):
m = [self.score_100s.value, self.score_50s.value, self.score_katus.value, self.score_miss.value]
self.perfect.value = sum(m) == 0
def update_score_hash(self):
self.set_score_hash(
md5_str("%d%s%s%s%d%d" % (
self.combo.value, "osu", self.player_name.value,
self.beatmap_hash.value, self.score.value, self.get_rank()
))
)
def update(self):
self.update_perfect()
self.update_score_hash()
# --------------------------------------------------------------------------
# Get / Helpers ------------------------------------------------------------
def get_hits(self):
return sum([self.score_300s.value, self.score_100s.value, self.score_50s.value])
def get_possible_hits(self):
return self.get_hits() + self.score_miss.value
def get_rank(self): # We dont give out F ranks around here
hits = self.get_possible_hits()
if hits == 0:
raise Exception("Can not calculate rank without any score data")
r300 = self.score_300s.value / hits
r50 = self.score_50s.value / hits
h = osuMods.any_enabled(self.mods.value, osuMods.HIDDEN | osuMods.FLASHLIGHT)
if r300 == 1:
return osuRanks.SSH if h else osuRanks.SS
if r300 > .9 and r50 <= .01 and self.score_miss.value == 0:
return osuRanks.SH if h else osuRanks.S
if r300 > .8 and self.score_miss.value == 0 or r300 > .9:
return osuRanks.A
if r300 > .7 and self.score_miss.value == 0 or r300 > .8:
return osuRanks.B
if r300 > .6:
return osuRanks.C
return osuRanks.D
# --------------------------------------------------------------------------
# Write replay data --------------------------------------------------------
def write(self, frame):
if type(frame) is list:
for _frame in frame: # (frames)
self.write(_frame)
return
self.replay_data.value += bytes(frame)
# --------------------------------------------------------------------------
# IO replay data -----------------------------------------------------------
def save(self, filename):
self.update()
serializer = Serializer()
attribs = [ a for a in self.__dir__() if not a.startswith("__") ]
for attrib in attribs:
if attrib == "end_of_attributes":
break
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
with open(filename, "wb") as f:
f.write( serializer.flush() )