osu-repy/osuRepy/replay.py
2019-09-01 14:14:33 +00:00

245 lines
8.4 KiB
Python

import struct
import string
#import lzma
import pylzma # Required cause pythons standard library lzma has missing support
from collections import OrderedDict
from .frame import ReplayFrame
from .helpers import osuModes
from .helpers import osuMods
from .helpers import osuRanks
from .helpers.typeSerializer import *
from io import BufferedReader
from os.path import isfile
from hashlib import md5 as _md5
import traceback
def md5_str(data):
print(data)
if type(data) is str:
data = data.encode("ascii")
print(_md5(data).hexdigest().encode())
return _md5(data).hexdigest().encode()
def md5_file(file):
if type(file) is not BufferedReader:
raise Exception("File is not a BufferedReader")
if file.mode != "rb":
raise Exception("File is not in ReadBytes mode")
hash = _md5()
for chunk in iter(lambda: file.read(4096), b""):
hash.update(chunk)
return hash.hexdigest().encode()
def append_and_compress(data):
return lzma_compress(data + b"-12345|0|0|1337,")
def lzma_compress(data):
comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False)
comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length
return struct.pack("<I", len(comp)) + comp # For some odd ass reason this doesnt use ULEB
class Replay:
structure = OrderedDict([
("mode", Serializable(osuModes.STANDARD, TYPE_BYTE)),
("osu_version", Serializable(20181216, TYPE_INT)),
("beatmap_hash",Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)),
("player_name", Serializable(b"osu!", TYPE_STRING)),
("score_hash", Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)),
("score_300s", Serializable(0, TYPE_USHORT)),
("score_100s", Serializable(0, TYPE_USHORT)),
("score_50s", Serializable(0, TYPE_USHORT)),
("score_gekis", Serializable(0, TYPE_USHORT)),
("score_katus", Serializable(0, TYPE_USHORT)),
("score_miss", Serializable(0, TYPE_USHORT)),
("score", Serializable(0, TYPE_INT)),
("combo", Serializable(0, TYPE_USHORT)),
("perfect", Serializable(True, TYPE_BOOL)),
("mods", Serializable(osuMods.NOMOD, TYPE_INT)),
("lifebar_graph", Serializable(b"0|1,", TYPE_STRING)),
("timestamp", Serializable(0, TYPE_ULLONG)),
("replay_data", Serializable(b"", TYPE_BYTESTREAM, post = append_and_compress)),
("online_score_id", Serializable(0, TYPE_ULLONG))
])
def __init__(self, **kwargs):
allowed_kwargs = {
"mode": self.set_mode,
"mods": self.set_mods,
"osu_version": self.set_osu_version,
"beatmap_hash": self.set_beatmap_hash,
"player_name": self.set_player_name,
"replay_data": self.write
}
for k, v in kwargs.items():
if k in allowed_kwargs.keys():
allowed_kwargs[k](v) # Run callback func
# --------------------------------------------------------------------------
# Set validators -----------------------------------------------------------
def set_mode(self, mode):
if mode > 3 or mode < 0:
raise Exception("Invalid mode")
self.structure["mode"].value = mode
def set_osu_version(self, osu_version):
if type(osu_version) is not int:
raise Exception("osu! version must be an int")
self.structure["osu_version"].value = osu_version
def set_beatmap_hash(self, md5_hash):
if type(md5_hash) is bytes:
md5_hash = md5_hash.decode()
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid beatmap hash")
self.structure["beatmap_hash"].value = md5_hash.encode()
def set_beatmap_file(self, filepath):
if not isfile(filepath):
raise Exception("Beatmap file not found")
with open(filepath, "rb") as f:
self.structure["beatmap_hash"].value = md5_file(f)
def set_player_name(self, player_name):
if type(player_name) is str:
player_name = player_name.encode()
self.structure["player_name"].value = player_name
def set_score_hash(self, md5_hash):
if type(md5_hash) is bytes:
md5_hash = md5_hash.decode()
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid replay hash")
self.structure["score_hash"].value = md5_hash.encode()
def set_score(self, score = None, combo = None, s300 = None, s100 = None, s50 = None, sgekis = None, skatus = None, miss = None):
if type(score) is int: self.structure["score"].value = score
if type(combo) is int: self.structure["combo"].value = combo
if type(s300) is int: self.structure["score_300s"].value = s300
if type(s100) is int: self.structure["score_100s"].value = s100
if type(s50) is int: self.structure["score_50s"].value = s50
if type(sgekis) is int: self.structure["score_gekis"].value = sgekis
if type(skatus) is int: self.structure["score_katus"].value = skatus
if type(miss) is int: self.structure["score_miss"].value = miss
def set_mods(self, mods):
if mods < 0 or mods > (1 << 30) - 1:
raise Exception("Mods are out of range")
self.structure["mods"].value = mods
def set_lifebar_graph(self, graph):
t_graph = type(graph)
if t_graph is list:
self.structure["lifebar_graph"].value = b"".join(graph)
elif t_graph is str:
self.structure["lifebar_graph"].value = graph.encode()
elif t_graph is bytes:
self.structure["lifebar_graph"].value = graph
else:
raise Exception("Invalid lifebar data")
def set_timestamp(self, timestamp, use_unix = True):
if use_unix:
timestamp += 62135599380000 # offset
timestamp *= 10 ** 4
self.structure["timestamp"].value = timestamp
def set_online_score_id(self, online_score_id):
self.structure["online_score_id"].value = online_score_id
# --------------------------------------------------------------------------
# Update variables ---------------------------------------------------------
def update_perfect(self):
m = [self.structure["score_100s"].value, self.structure["score_50s"].value, self.structure["score_katus"].value, self.structure["score_miss"].value]
self.structure["perfect"].value = sum(m) == 0
def update_score_hash(self):
self.set_score_hash(
md5_str(b"%d%b%b%b%d%b" % (
self.structure["combo"].value, b"osu", self.structure["player_name"].value,
self.structure["beatmap_hash"].value, self.structure["score"].value, self.get_rank()
))
)
def update(self):
self.update_perfect()
self.update_score_hash()
# --------------------------------------------------------------------------
# Get / Helpers ------------------------------------------------------------
def get_hits(self):
return sum([self.structure["score_300s"].value, self.structure["score_100s"].value, self.structure["score_50s"].value])
def get_possible_hits(self):
return self.get_hits() + self.structure["score_miss"].value
def get_rank(self):
hits = self.get_possible_hits()
if hits == 0:
print("Can not calculate rank without any score data (Defaulting to Fail)")
return osuRanks.F
r300 = self.structure["score_300s"].value / hits
r50 = self.structure["score_50s"].value / hits
h = osuMods.any_enabled(self.structure["mods"].value, osuMods.HIDDEN | osuMods.FLASHLIGHT)
if r300 == 1:
return osuRanks.SSH if h else osuRanks.SS
if r300 > .9 and r50 <= .01 and self.structure["score_miss"].value == 0:
return osuRanks.SH if h else osuRanks.S
if r300 > .8 and self.structure["score_miss"].value == 0 or r300 > .9:
return osuRanks.A
if r300 > .7 and self.structure["score_miss"].value == 0 or r300 > .8:
return osuRanks.B
if r300 > .6:
return osuRanks.C
return osuRanks.D
# --------------------------------------------------------------------------
# Write replay data --------------------------------------------------------
def write(self, frame):
t_frame = type(frame)
if t_frame is list:
for _frame in frame: # (frames)
self.write(_frame)
return
if t_frame is ReplayFrame:
self.structure["replay_data"].value += bytes(frame)
elif t_frame is bytes:
self.structure["replay_data"].value += frame
elif t_frame is str:
self.structure["replay_data"].value += frame.encode()
else:
raise Exception("Invalid frame data")
# --------------------------------------------------------------------------
# IO replay data -----------------------------------------------------------
def save(self, filename):
self.update()
serializer = Serializer()
[ serializer.add( struc ) for struc in self.structure.values() ] # Add values to serializer
with open(filename, "wb") as f:
f.write( serializer.flush() )