243 lines
8.3 KiB
Python
243 lines
8.3 KiB
Python
import struct
|
|
import string
|
|
#import lzma
|
|
import pylzma # Required cause pythons standard library lzma has missing support
|
|
|
|
from .frame import ReplayFrame
|
|
|
|
from .helpers import osuModes
|
|
from .helpers import osuMods
|
|
from .helpers import osuRanks
|
|
|
|
from .helpers.typeSerializer import *
|
|
|
|
from io import BufferedReader
|
|
from os.path import isfile
|
|
from hashlib import md5 as _md5
|
|
|
|
import traceback
|
|
|
|
def md5_str(data):
|
|
print(data)
|
|
if type(data) is str:
|
|
data = data.encode("ascii")
|
|
print(_md5(data).hexdigest().encode())
|
|
return _md5(data).hexdigest().encode()
|
|
|
|
def md5_file(file):
|
|
if type(file) is not BufferedReader:
|
|
raise Exception("File is not a BufferedReader")
|
|
|
|
if file.mode != "rb":
|
|
raise Exception("File is not in ReadBytes mode")
|
|
|
|
hash = _md5()
|
|
for chunk in iter(lambda: file.read(4096), b""):
|
|
hash.update(chunk)
|
|
return hash.hexdigest().encode()
|
|
|
|
def append_and_compress(data):
|
|
return lzma_compress(data + b"-12345|0|0|1337,")
|
|
|
|
def lzma_compress(data):
|
|
comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False)
|
|
comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length
|
|
return struct.pack("<I", len(comp)) + comp # For some odd ass reason this doesnt use ULEB
|
|
|
|
class Replay:
|
|
structure = {
|
|
"mode": Serializable(osuModes.STANDARD, TYPE_BYTE),
|
|
"osu_version": Serializable(20181216, TYPE_INT),
|
|
"beatmap_hash": Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING),
|
|
"player_name": Serializable(b"osu!", TYPE_STRING),
|
|
"score_hash": Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING),
|
|
|
|
"score_300s": Serializable(0, TYPE_USHORT),
|
|
"score_100s": Serializable(0, TYPE_USHORT),
|
|
"score_50s": Serializable(0, TYPE_USHORT),
|
|
"score_gekis": Serializable(0, TYPE_USHORT),
|
|
"score_katus": Serializable(0, TYPE_USHORT),
|
|
"score_miss": Serializable(0, TYPE_USHORT),
|
|
"score": Serializable(0, TYPE_INT),
|
|
"combo": Serializable(0, TYPE_USHORT),
|
|
"perfect": Serializable(True, TYPE_BOOL),
|
|
|
|
"mods": Serializable(osuMods.NOMOD, TYPE_INT),
|
|
|
|
"lifebar_graph": Serializable(b"0|1,", TYPE_STRING),
|
|
"timestamp": Serializable(0, TYPE_ULLONG),
|
|
|
|
"replay_data": Serializable(b"", TYPE_BYTESTREAM, post = append_and_compress),
|
|
|
|
"online_score_id": Serializable(0, TYPE_ULLONG)
|
|
}
|
|
|
|
def __init__(self, **kwargs):
|
|
allowed_kwargs = {
|
|
"mode": self.set_mode,
|
|
"mods": self.set_mods,
|
|
"osu_version": self.set_osu_version,
|
|
"beatmap_hash": self.set_beatmap_hash,
|
|
"player_name": self.set_player_name,
|
|
"replay_data": self.write
|
|
}
|
|
for k, v in kwargs.items():
|
|
if k in allowed_kwargs.keys():
|
|
allowed_kwargs[k](v) # Run callback func
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Set validators -----------------------------------------------------------
|
|
|
|
def set_mode(self, mode):
|
|
if mode > 3 or mode < 0:
|
|
raise Exception("Invalid mode")
|
|
self.structure["mode"].value = mode
|
|
|
|
def set_osu_version(self, osu_version):
|
|
if type(osu_version) is not int:
|
|
raise Exception("osu! version must be an int")
|
|
self.structure["osu_version"].value = osu_version
|
|
|
|
def set_beatmap_hash(self, md5_hash):
|
|
if type(md5_hash) is bytes:
|
|
md5_hash = md5_hash.decode()
|
|
|
|
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
|
raise Exception("Invalid beatmap hash")
|
|
self.structure["beatmap_hash"].value = md5_hash.encode()
|
|
|
|
def set_beatmap_file(self, filepath):
|
|
if not isfile(filepath):
|
|
raise Exception("Beatmap file not found")
|
|
with open(filepath, "rb") as f:
|
|
self.structure["beatmap_hash"].value = md5_file(f)
|
|
|
|
def set_player_name(self, player_name):
|
|
if type(player_name) is str:
|
|
player_name = player_name.encode()
|
|
self.structure["player_name"].value = player_name
|
|
|
|
def set_score_hash(self, md5_hash):
|
|
if type(md5_hash) is bytes:
|
|
md5_hash = md5_hash.decode()
|
|
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
|
raise Exception("Invalid replay hash")
|
|
self.structure["score_hash"].value = md5_hash.encode()
|
|
|
|
def set_score(self, score = None, combo = None, s300 = None, s100 = None, s50 = None, sgekis = None, skatus = None, miss = None):
|
|
if type(score) is int: self.structure["score"].value = score
|
|
if type(combo) is int: self.structure["combo"].value = combo
|
|
if type(s300) is int: self.structure["score_300s"].value = s300
|
|
if type(s100) is int: self.structure["score_100s"].value = s100
|
|
if type(s50) is int: self.structure["score_50s"].value = s50
|
|
if type(sgekis) is int: self.structure["score_gekis"].value = sgekis
|
|
if type(skatus) is int: self.structure["score_katus"].value = skatus
|
|
if type(miss) is int: self.structure["score_miss"].value = miss
|
|
|
|
def set_mods(self, mods):
|
|
if mods < 0 or mods > (1 << 30) - 1:
|
|
raise Exception("Mods are out of range")
|
|
self.structure["mods"].value = mods
|
|
|
|
def set_lifebar_graph(self, graph):
|
|
t_graph = type(graph)
|
|
if t_graph is list:
|
|
self.structure["lifebar_graph"].value = b"".join(graph)
|
|
elif t_graph is str:
|
|
self.structure["lifebar_graph"].value = graph.encode()
|
|
elif t_graph is bytes:
|
|
self.structure["lifebar_graph"].value = graph
|
|
else:
|
|
raise Exception("Invalid lifebar data")
|
|
|
|
def set_timestamp(self, timestamp, use_unix = True):
|
|
if use_unix:
|
|
timestamp += 62135599380000 # offset
|
|
timestamp *= 10 ** 4
|
|
|
|
self.structure["timestamp"].value = timestamp
|
|
|
|
def set_online_score_id(self, online_score_id):
|
|
self.structure["online_score_id"].value = online_score_id
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Update variables ---------------------------------------------------------
|
|
|
|
def update_perfect(self):
|
|
m = [self.structure["score_100s"].value, self.structure["score_50s"].value, self.structure["score_katus"].value, self.structure["score_miss"].value]
|
|
self.structure["perfect"].value = sum(m) == 0
|
|
|
|
def update_score_hash(self):
|
|
self.set_score_hash(
|
|
md5_str(b"%d%b%b%b%d%b" % (
|
|
self.structure["combo"].value, b"osu", self.structure["player_name"].value,
|
|
self.structure["beatmap_hash"].value, self.structure["score"].value, self.get_rank()
|
|
))
|
|
)
|
|
|
|
def update(self):
|
|
self.update_perfect()
|
|
self.update_score_hash()
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Get / Helpers ------------------------------------------------------------
|
|
|
|
def get_hits(self):
|
|
return sum([self.structure["score_300s"].value, self.structure["score_100s"].value, self.structure["score_50s"].value])
|
|
|
|
def get_possible_hits(self):
|
|
return self.get_hits() + self.structure["score_miss"].value
|
|
|
|
def get_rank(self):
|
|
hits = self.get_possible_hits()
|
|
if hits == 0:
|
|
print("Can not calculate rank without any score data (Defaulting to Fail)")
|
|
return osuRanks.F
|
|
|
|
r300 = self.structure["score_300s"].value / hits
|
|
r50 = self.structure["score_50s"].value / hits
|
|
h = osuMods.any_enabled(self.structure["mods"].value, osuMods.HIDDEN | osuMods.FLASHLIGHT)
|
|
|
|
if r300 == 1:
|
|
return osuRanks.SSH if h else osuRanks.SS
|
|
if r300 > .9 and r50 <= .01 and self.structure["score_miss"].value == 0:
|
|
return osuRanks.SH if h else osuRanks.S
|
|
if r300 > .8 and self.structure["score_miss"].value == 0 or r300 > .9:
|
|
return osuRanks.A
|
|
if r300 > .7 and self.structure["score_miss"].value == 0 or r300 > .8:
|
|
return osuRanks.B
|
|
if r300 > .6:
|
|
return osuRanks.C
|
|
return osuRanks.D
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Write replay data --------------------------------------------------------
|
|
|
|
def write(self, frame):
|
|
t_frame = type(frame)
|
|
if t_frame is list:
|
|
for _frame in frame: # (frames)
|
|
self.write(_frame)
|
|
return
|
|
if t_frame is ReplayFrame:
|
|
self.structure["replay_data"].value += bytes(frame)
|
|
elif t_frame is bytes:
|
|
self.structure["replay_data"].value += frame
|
|
elif t_frame is str:
|
|
self.structure["replay_data"].value += frame.encode()
|
|
else:
|
|
raise Exception("Invalid frame data")
|
|
|
|
# --------------------------------------------------------------------------
|
|
# IO replay data -----------------------------------------------------------
|
|
|
|
def save(self, filename):
|
|
self.update()
|
|
|
|
serializer = Serializer()
|
|
|
|
[ serializer.add( struc ) for struc in self.structure.values() ] # Add values to serializer
|
|
|
|
with open(filename, "wb") as f:
|
|
f.write( serializer.flush() )
|