Compare commits

..

5 Commits

Author SHA1 Message Date
ad7c608df6 Use OrderedDict 2019-09-01 14:14:33 +00:00
8b7ebbe7e0 Update 'osuRepy/replay.py' 2019-09-01 13:55:19 +00:00
65436df5d0 Wops.. values instead of items >.< 2019-09-01 13:47:38 +00:00
088674041a Dont use scuffed method
it makes no sense to do what I did before
2019-09-01 13:44:47 +00:00
abbc9bb2f0 Update 'README.md'
There was an attempt
2019-08-25 15:39:04 +00:00
5 changed files with 119 additions and 199 deletions

View File

@ -27,7 +27,7 @@ replay.save("myReplay.osr") # Export replay to file
# Enums (from helpers) # Enums (from helpers)
### osuButtons ### osuButtons
|Name|Value| |Name|Value|
|-|-| |--|--|
|NONE|0| |NONE|0|
|LEFTMOUSE|1| |LEFTMOUSE|1|
|RIGHTMOUSE|2| |RIGHTMOUSE|2|
@ -41,7 +41,7 @@ replay.save("myReplay.osr") # Export replay to file
### osuModes ### osuModes
|Name|Value| |Name|Value|
|-|-| |--|--|
|STANDARD|0| |STANDARD|0|
|TAIKO|1| |TAIKO|1|
|CATCH_THE_BEAT|2| |CATCH_THE_BEAT|2|
@ -49,7 +49,7 @@ replay.save("myReplay.osr") # Export replay to file
### osuMods ### osuMods
|Name|Value| |Name|Value|
|-|-| |--|--|
|NOMOD|0| |NOMOD|0|
|NOFAIL|1| |NOFAIL|1|
|EASY|2| |EASY|2|
@ -84,7 +84,7 @@ replay.save("myReplay.osr") # Export replay to file
### osuRanks ### osuRanks
|Name|Value| |Name|Value|
|-|-| |--|--|
|SSH|0| |SSH|0|
|SH|1| |SH|1|
|SS|2| |SS|2|

View File

@ -35,6 +35,6 @@ class ReplayFrame:
self._float_convert(self.x), self._float_convert(self.x),
self._float_convert(self.y), self._float_convert(self.y),
self.buttons) self.buttons)
def __bytes__(self): def __bytes__(self):
return str(self).encode() return str(self).encode()

View File

@ -1,21 +1,20 @@
import struct import struct
TYPE_CHAR = b"b", 1 TYPE_CHAR = b"b"
TYPE_BYTE = b"B", 1 TYPE_BYTE = b"B"
TYPE_BOOL = b"?", 1 TYPE_BOOL = b"?"
TYPE_SHORT = b"h", 2 TYPE_SHORT = b"h"
TYPE_USHORT = b"H", 2 TYPE_USHORT = b"H"
TYPE_INT = b"i", 4 TYPE_INT = b"i"
TYPE_UINT = b"I", 4 TYPE_UINT = b"I"
TYPE_LONG = b"l", 4 TYPE_LONG = b"l"
TYPE_ULONG = b"L", 4 TYPE_ULONG = b"L"
TYPE_LLONG = b"q", 8 TYPE_LLONG = b"q"
TYPE_ULLONG = b"Q", 8 TYPE_ULLONG = b"Q"
TYPE_FLOAT = b"f", 4 TYPE_FLOAT = b"f"
TYPE_DOUBLE = b"d", 8 TYPE_DOUBLE = b"d"
TYPE_STRING = b"s", 1 TYPE_STRING = b"B%ds"
TYPE_BYTESTREAM = b"%ds"
TYPE_ULEB = b"s", 1, True # True just so they are different tuples
BYTEORDER_NATIVE = b"@" BYTEORDER_NATIVE = b"@"
BYTEORDER_LITTLE = b"<" BYTEORDER_LITTLE = b"<"
@ -23,6 +22,7 @@ BYTEORDER_BIG = b">"
def encode_uleb(val): def encode_uleb(val):
data = b"" data = b""
pos = 0
while val != 0: while val != 0:
b = val & 0x7f b = val & 0x7f
val >>= 7 val >>= 7
@ -73,18 +73,10 @@ def guess_type(value):
raise Exception("Unable to guess type") raise Exception("Unable to guess type")
def read(value_type, stream):
#if value_type == TYPE_STRING:
# read()
struct.unpack(value_type[0], stream[:value_type[1]])
class Serializable: class Serializable:
length = None post_serialized = None
prefix = b""
filter = None
def __init__(self, value, type = None, length = None, prefix = b"", filter = None): def __init__(self, value, type = None, post = None):
self.value = value self.value = value
if type is not None: if type is not None:
@ -92,76 +84,29 @@ class Serializable:
else: else:
self.type = guess_type(type) self.type = guess_type(type)
self.length = length self.post_serialized = post
self.prefix = prefix
self.filter = filter
def get_struct_fmt(self, length = 1, uleb_length_override = -1): def get_struct_fmt(self):
struct_fmt = self.type[0] if self.type in [TYPE_STRING, TYPE_BYTESTREAM]:
if length > 1: return self.type % len(self.get_value()[-1])
struct_fmt = b"%d%b" % (length, self.type[0]) return self.type
if self.length is not None:
if self.length == TYPE_ULEB:
struct_fmt = b"%d%b%b" % (len(self.get_value()[0]) if uleb_length_override == -1 else uleb_length_override, self.length[0], struct_fmt)
else:
struct_fmt = b"%b%b" % (self.length[0], struct_fmt)
return struct_fmt
def get_value(self): def get_value(self):
val = [self.value] def _get_value():
if self.filter is not None: if self.type == TYPE_STRING:
val[-1] = self.filter(val[-1], 0) return [0x0b, encode_uleb(len(self.value)) + self.value]
if self.length is not None: return [self.value]
if self.length == TYPE_ULEB:
val.insert(0, encode_uleb(len(val[-1]))) val = _get_value()
else: if self.post_serialized is not None:
val.insert(0, len(val[-1])) val[-1] = self.post_serialized(val[-1])
return val return val
def pack(self, byte_order = BYTEORDER_LITTLE): def pack(self, byte_order = BYTEORDER_LITTLE):
length = 1 struct_fmt = byte_order + self.get_struct_fmt()
if self.type == TYPE_STRING: #print(struct_fmt, self.get_value())
length = len(self.get_value()[-1]) return struct.pack(struct_fmt, *self.get_value())
struct_fmt = byte_order + self.get_struct_fmt(length)
packed = struct.pack(struct_fmt, *self.get_value())
if self.prefix is not None:
packed = self.prefix + packed
return packed
def unpack(self, stream, byte_order = BYTEORDER_LITTLE):
length = 1
read_pos = 0
extra_read = 0
remove_struct_fmt = []
if self.type == TYPE_STRING:
if self.length == TYPE_ULEB:
extra_read = 1
read_pos += 1 # Skip 0x0b
length, _read_pos = decode_uleb(stream[read_pos:])
read_pos += _read_pos - 1
else:
l = Serializable(0, self.length)
read_pos = l.unpack(stream)
length = l.get_value()[-1]
remove_struct_fmt.append(0)
struct_fmt = byte_order + self.get_struct_fmt(length, read_pos)
length += extra_read
if len(remove_struct_fmt):
for rem in remove_struct_fmt:
struct_fmt = struct_fmt[:rem + 1] + struct_fmt[rem + 2:]
data = struct.unpack(struct_fmt, stream[read_pos:read_pos + self.type[1] * length])
if self.filter is not None:
data = list(data)
data[-1] = self.filter(data[-1], 1)
self.value = data[-1]
return read_pos + self.type[1] * length
class Serializer: class Serializer:
def __init__(self, byte_order = BYTEORDER_LITTLE): def __init__(self, byte_order = BYTEORDER_LITTLE):
@ -174,13 +119,6 @@ class Serializer:
else: else:
self.stack.append(serializable) self.stack.append(serializable)
def serialize(self):
return b"".join(s.pack() for s in self.stack)
def deserialize(self, stream):
for s in self.stack:
stream = stream[s.unpack(stream):]
def flush(self): def flush(self):
return b"".join(s.pack() for s in self.stack) return b"".join(s.pack() for s in self.stack)

View File

@ -1,8 +1,10 @@
import struct import struct
import string import string
import lzma #import lzma
import pylzma # Required cause pythons standard library lzma has missing support import pylzma # Required cause pythons standard library lzma has missing support
from collections import OrderedDict
from .frame import ReplayFrame from .frame import ReplayFrame
from .helpers import osuModes from .helpers import osuModes
@ -18,8 +20,10 @@ from hashlib import md5 as _md5
import traceback import traceback
def md5_str(data): def md5_str(data):
print(data)
if type(data) is str: if type(data) is str:
data = data.encode("ascii") data = data.encode("ascii")
print(_md5(data).hexdigest().encode())
return _md5(data).hexdigest().encode() return _md5(data).hexdigest().encode()
def md5_file(file): def md5_file(file):
@ -32,56 +36,43 @@ def md5_file(file):
hash = _md5() hash = _md5()
for chunk in iter(lambda: file.read(4096), b""): for chunk in iter(lambda: file.read(4096), b""):
hash.update(chunk) hash.update(chunk)
return hash.hexdigest() return hash.hexdigest().encode()
def append_and_compress(data, mode): def append_and_compress(data):
if mode == 0: return lzma_compress(data + b"-12345|0|0|1337,")
return lzma_compress(data + b"-12345|0|0|1337,")
else:
return lzma_decompress(data)[:-len(b"-12345|0|0|1337,")]
def lzma_compress(data): def lzma_compress(data):
comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False) comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False)
comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length
return comp return struct.pack("<I", len(comp)) + comp # For some odd ass reason this doesnt use ULEB
def lzma_decompress(data):
comp = lzma.decompress(data)
return comp
class Replay: class Replay:
_mode = Serializable(osuModes.STANDARD, TYPE_BYTE) structure = OrderedDict([
_osu_version = Serializable(20181216, TYPE_INT) ("mode", Serializable(osuModes.STANDARD, TYPE_BYTE)),
#_beatmap_hash_length = Serializable(32, TYPE_ULEB) ("osu_version", Serializable(20181216, TYPE_INT)),
_beatmap_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b") ("beatmap_hash",Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)),
#_player_name_length = Serializable(4, TYPE_ULEB) ("player_name", Serializable(b"osu!", TYPE_STRING)),
_player_name = Serializable(b"osu!", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b") ("score_hash", Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)),
#_score_hash_length = Serializable(32, TYPE_ULEB)
_score_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
_score_300s = Serializable(0, TYPE_USHORT) ("score_300s", Serializable(0, TYPE_USHORT)),
_score_100s = Serializable(0, TYPE_USHORT) ("score_100s", Serializable(0, TYPE_USHORT)),
_score_50s = Serializable(0, TYPE_USHORT) ("score_50s", Serializable(0, TYPE_USHORT)),
_score_gekis = Serializable(0, TYPE_USHORT) ("score_gekis", Serializable(0, TYPE_USHORT)),
_score_katus = Serializable(0, TYPE_USHORT) ("score_katus", Serializable(0, TYPE_USHORT)),
_score_miss = Serializable(0, TYPE_USHORT) ("score_miss", Serializable(0, TYPE_USHORT)),
_score = Serializable(0, TYPE_INT) ("score", Serializable(0, TYPE_INT)),
_combo = Serializable(0, TYPE_USHORT) ("combo", Serializable(0, TYPE_USHORT)),
_perfect = Serializable(True, TYPE_BOOL) ("perfect", Serializable(True, TYPE_BOOL)),
_mods = Serializable(osuMods.NOMOD, TYPE_INT) ("mods", Serializable(osuMods.NOMOD, TYPE_INT)),
#_lifebar_graph_length = Serializable(4, TYPE_ULEB) ("lifebar_graph", Serializable(b"0|1,", TYPE_STRING)),
_lifebar_graph = Serializable(b"0|1,", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b") ("timestamp", Serializable(0, TYPE_ULLONG)),
_timestamp = Serializable(0, TYPE_ULLONG)
#_replay_data_length = Serializable(0, TYPE_UINT) ("replay_data", Serializable(b"", TYPE_BYTESTREAM, post = append_and_compress)),
_replay_data = Serializable(b"", TYPE_STRING, length = TYPE_UINT, filter = append_and_compress)
_online_score_id = Serializable(0, TYPE_ULLONG) ("online_score_id", Serializable(0, TYPE_ULLONG))
])
# Marker (Only used as attribute section marker for auto-serializing)
_end_of_attributes = None
def __init__(self, **kwargs): def __init__(self, **kwargs):
allowed_kwargs = { allowed_kwargs = {
@ -102,12 +93,12 @@ class Replay:
def set_mode(self, mode): def set_mode(self, mode):
if mode > 3 or mode < 0: if mode > 3 or mode < 0:
raise Exception("Invalid mode") raise Exception("Invalid mode")
self._mode.value = mode self.structure["mode"].value = mode
def set_osu_version(self, osu_version): def set_osu_version(self, osu_version):
if type(osu_version) is not int: if type(osu_version) is not int:
raise Exception("osu! version must be an int") raise Exception("osu! version must be an int")
self._osu_version.value = osu_version self.structure["osu_version"].value = osu_version
def set_beatmap_hash(self, md5_hash): def set_beatmap_hash(self, md5_hash):
if type(md5_hash) is bytes: if type(md5_hash) is bytes:
@ -115,49 +106,49 @@ class Replay:
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid beatmap hash") raise Exception("Invalid beatmap hash")
self._beatmap_hash.value = md5_hash.encode() self.structure["beatmap_hash"].value = md5_hash.encode()
def set_beatmap_file(self, filepath): def set_beatmap_file(self, filepath):
if not isfile(filepath): if not isfile(filepath):
raise Exception("Beatmap file not found") raise Exception("Beatmap file not found")
with open(filepath, "rb") as f: with open(filepath, "rb") as f:
self._beatmap_hash.value = md5_file(f) self.structure["beatmap_hash"].value = md5_file(f)
def set_player_name(self, player_name): def set_player_name(self, player_name):
if type(player_name) is str: if type(player_name) is str:
player_name = player_name.encode() player_name = player_name.encode()
self._player_name.value = player_name self.structure["player_name"].value = player_name
def set_score_hash(self, md5_hash): def set_score_hash(self, md5_hash):
if type(md5_hash) is bytes: if type(md5_hash) is bytes:
md5_hash = md5_hash.decode() md5_hash = md5_hash.decode()
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32: if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid replay hash") raise Exception("Invalid replay hash")
self._score_hash.value = md5_hash.encode() self.structure["score_hash"].value = md5_hash.encode()
def set_score(self, score = None, combo = None, s300 = None, s100 = None, s50 = None, sgekis = None, skatus = None, miss = None): def set_score(self, score = None, combo = None, s300 = None, s100 = None, s50 = None, sgekis = None, skatus = None, miss = None):
if type(score) is int: self._score.value = score if type(score) is int: self.structure["score"].value = score
if type(combo) is int: self._combo.value = combo if type(combo) is int: self.structure["combo"].value = combo
if type(s300) is int: self._score_300s.value = s300 if type(s300) is int: self.structure["score_300s"].value = s300
if type(s100) is int: self._score_100s.value = s100 if type(s100) is int: self.structure["score_100s"].value = s100
if type(s50) is int: self._score_50s.value = s50 if type(s50) is int: self.structure["score_50s"].value = s50
if type(sgekis) is int: self._score_gekis.value = sgekis if type(sgekis) is int: self.structure["score_gekis"].value = sgekis
if type(skatus) is int: self._score_katus.value = skatus if type(skatus) is int: self.structure["score_katus"].value = skatus
if type(miss) is int: self._score_miss.value = miss if type(miss) is int: self.structure["score_miss"].value = miss
def set_mods(self, mods): def set_mods(self, mods):
if mods < 0 or mods > (1 << 30) - 1: if mods < 0 or mods > (1 << 30) - 1:
raise Exception("Mods are out of range") raise Exception("Mods are out of range")
self._mods.value = mods self.structure["mods"].value = mods
def set_lifebar_graph(self, graph): def set_lifebar_graph(self, graph):
t_graph = type(graph) t_graph = type(graph)
if t_graph is list: if t_graph is list:
self._lifebar_graph.value = b"".join(graph) self.structure["lifebar_graph"].value = b"".join(graph)
elif t_graph is str: elif t_graph is str:
self._lifebar_graph.value = graph.encode() self.structure["lifebar_graph"].value = graph.encode()
elif t_graph is bytes: elif t_graph is bytes:
self._lifebar_graph.value = graph self.structure["lifebar_graph"].value = graph
else: else:
raise Exception("Invalid lifebar data") raise Exception("Invalid lifebar data")
@ -166,23 +157,23 @@ class Replay:
timestamp += 62135599380000 # offset timestamp += 62135599380000 # offset
timestamp *= 10 ** 4 timestamp *= 10 ** 4
self._timestamp.value = timestamp self.structure["timestamp"].value = timestamp
def set_online_score_id(self, online_score_id): def set_online_score_id(self, online_score_id):
self._online_score_id.value = online_score_id self.structure["online_score_id"].value = online_score_id
# -------------------------------------------------------------------------- # --------------------------------------------------------------------------
# Update variables --------------------------------------------------------- # Update variables ---------------------------------------------------------
def update_perfect(self): def update_perfect(self):
m = [self._score_100s.value, self._score_50s.value, self._score_katus.value, self._score_miss.value] m = [self.structure["score_100s"].value, self.structure["score_50s"].value, self.structure["score_katus"].value, self.structure["score_miss"].value]
self._perfect.value = sum(m) == 0 self.structure["perfect"].value = sum(m) == 0
def update_score_hash(self): def update_score_hash(self):
self.set_score_hash( self.set_score_hash(
md5_str(b"%d%b%b%b%d%b" % ( md5_str(b"%d%b%b%b%d%b" % (
self._combo.value, b"osu", self._player_name.value, self.structure["combo"].value, b"osu", self.structure["player_name"].value,
self._beatmap_hash.value, self._score.value, self.get_rank() self.structure["beatmap_hash"].value, self.structure["score"].value, self.get_rank()
)) ))
) )
@ -194,10 +185,10 @@ class Replay:
# Get / Helpers ------------------------------------------------------------ # Get / Helpers ------------------------------------------------------------
def get_hits(self): def get_hits(self):
return sum([self._score_300s.value, self._score_100s.value, self._score_50s.value]) return sum([self.structure["score_300s"].value, self.structure["score_100s"].value, self.structure["score_50s"].value])
def get_possible_hits(self): def get_possible_hits(self):
return self.get_hits() + self._score_miss.value return self.get_hits() + self.structure["score_miss"].value
def get_rank(self): def get_rank(self):
hits = self.get_possible_hits() hits = self.get_possible_hits()
@ -205,17 +196,17 @@ class Replay:
print("Can not calculate rank without any score data (Defaulting to Fail)") print("Can not calculate rank without any score data (Defaulting to Fail)")
return osuRanks.F return osuRanks.F
r300 = self._score_300s.value / hits r300 = self.structure["score_300s"].value / hits
r50 = self._score_50s.value / hits r50 = self.structure["score_50s"].value / hits
h = osuMods.any_enabled(self._mods.value, osuMods.HIDDEN | osuMods.FLASHLIGHT) h = osuMods.any_enabled(self.structure["mods"].value, osuMods.HIDDEN | osuMods.FLASHLIGHT)
if r300 == 1: if r300 == 1:
return osuRanks.SSH if h else osuRanks.SS return osuRanks.SSH if h else osuRanks.SS
if r300 > .9 and r50 <= .01 and self._score_miss.value == 0: if r300 > .9 and r50 <= .01 and self.structure["score_miss"].value == 0:
return osuRanks.SH if h else osuRanks.S return osuRanks.SH if h else osuRanks.S
if r300 > .8 and self._score_miss.value == 0 or r300 > .9: if r300 > .8 and self.structure["score_miss"].value == 0 or r300 > .9:
return osuRanks.A return osuRanks.A
if r300 > .7 and self._score_miss.value == 0 or r300 > .8: if r300 > .7 and self.structure["score_miss"].value == 0 or r300 > .8:
return osuRanks.B return osuRanks.B
if r300 > .6: if r300 > .6:
return osuRanks.C return osuRanks.C
@ -231,11 +222,11 @@ class Replay:
self.write(_frame) self.write(_frame)
return return
if t_frame is ReplayFrame: if t_frame is ReplayFrame:
self._replay_data.value += bytes(frame) self.structure["replay_data"].value += bytes(frame)
elif t_frame is bytes: elif t_frame is bytes:
self._replay_data.value += frame self.structure["replay_data"].value += frame
elif t_frame is str: elif t_frame is str:
self._replay_data.value += frame.encode() self.structure["replay_data"].value += frame.encode()
else: else:
raise Exception("Invalid frame data") raise Exception("Invalid frame data")
@ -247,25 +238,7 @@ class Replay:
serializer = Serializer() serializer = Serializer()
attribs = [ a for a in self.__dir__() if not a.startswith("__") ] [ serializer.add( struc ) for struc in self.structure.values() ] # Add values to serializer
for attrib in attribs:
if attrib == "_end_of_attributes":
break
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
with open(filename, "wb") as f: with open(filename, "wb") as f:
f.write( serializer.serialize() ) f.write( serializer.flush() )
def load(self, filename):
serializer = Serializer()
attribs = [ a for a in self.__dir__() if not a.startswith("__") ]
for attrib in attribs:
if attrib == "_end_of_attributes":
break
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
with open(filename, "rb") as f:
serializer.deserialize( f.read() )

15
test.py
View File

@ -3,8 +3,17 @@ from osuRepy.replay import Replay # Replay instance
from osuRepy.helpers import osuButtons, osuMods # Enum helpers from osuRepy.helpers import osuButtons, osuMods # Enum helpers
replay = Replay() # Create replay instance replay = Replay(beatmap_hash = "9e66c0a0eadced7d07f06b3968a74cc0") # Create replay instance
replay.load("replay.osr") replay.write( ReplayFrame(2, 256, 192, osuButtons.M1) ) # Add replay frame (time, x, y, buttons)
replay.save("test.osr") replay.set_score(score = 29284624,
s300 = 416,
s50 = 2,
miss = 1,
combo = 358) # Set score info
replay.set_mods(osuMods.HIDDEN | osuMods.DOUBLETIME) # Enable HDDT mods
replay.set_timestamp(636880715111611126, False)
replay.save("myReplay.osr") # Export replay to file