Compare commits
3 Commits
master
...
serializer
Author | SHA1 | Date | |
---|---|---|---|
1a0c63872c | |||
ba738e395d | |||
12ea8c4d1b |
|
@ -27,7 +27,7 @@ replay.save("myReplay.osr") # Export replay to file
|
||||||
# Enums (from helpers)
|
# Enums (from helpers)
|
||||||
### osuButtons
|
### osuButtons
|
||||||
|Name|Value|
|
|Name|Value|
|
||||||
|--|--|
|
|-|-|
|
||||||
|NONE|0|
|
|NONE|0|
|
||||||
|LEFTMOUSE|1|
|
|LEFTMOUSE|1|
|
||||||
|RIGHTMOUSE|2|
|
|RIGHTMOUSE|2|
|
||||||
|
@ -41,7 +41,7 @@ replay.save("myReplay.osr") # Export replay to file
|
||||||
|
|
||||||
### osuModes
|
### osuModes
|
||||||
|Name|Value|
|
|Name|Value|
|
||||||
|--|--|
|
|-|-|
|
||||||
|STANDARD|0|
|
|STANDARD|0|
|
||||||
|TAIKO|1|
|
|TAIKO|1|
|
||||||
|CATCH_THE_BEAT|2|
|
|CATCH_THE_BEAT|2|
|
||||||
|
@ -49,7 +49,7 @@ replay.save("myReplay.osr") # Export replay to file
|
||||||
|
|
||||||
### osuMods
|
### osuMods
|
||||||
|Name|Value|
|
|Name|Value|
|
||||||
|--|--|
|
|-|-|
|
||||||
|NOMOD|0|
|
|NOMOD|0|
|
||||||
|NOFAIL|1|
|
|NOFAIL|1|
|
||||||
|EASY|2|
|
|EASY|2|
|
||||||
|
@ -84,7 +84,7 @@ replay.save("myReplay.osr") # Export replay to file
|
||||||
|
|
||||||
### osuRanks
|
### osuRanks
|
||||||
|Name|Value|
|
|Name|Value|
|
||||||
|--|--|
|
|-|-|
|
||||||
|SSH|0|
|
|SSH|0|
|
||||||
|SH|1|
|
|SH|1|
|
||||||
|SS|2|
|
|SS|2|
|
||||||
|
|
|
@ -35,6 +35,6 @@ class ReplayFrame:
|
||||||
self._float_convert(self.x),
|
self._float_convert(self.x),
|
||||||
self._float_convert(self.y),
|
self._float_convert(self.y),
|
||||||
self.buttons)
|
self.buttons)
|
||||||
|
|
||||||
def __bytes__(self):
|
def __bytes__(self):
|
||||||
return str(self).encode()
|
return str(self).encode()
|
||||||
|
|
|
@ -1,20 +1,21 @@
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
TYPE_CHAR = b"b"
|
TYPE_CHAR = b"b", 1
|
||||||
TYPE_BYTE = b"B"
|
TYPE_BYTE = b"B", 1
|
||||||
TYPE_BOOL = b"?"
|
TYPE_BOOL = b"?", 1
|
||||||
TYPE_SHORT = b"h"
|
TYPE_SHORT = b"h", 2
|
||||||
TYPE_USHORT = b"H"
|
TYPE_USHORT = b"H", 2
|
||||||
TYPE_INT = b"i"
|
TYPE_INT = b"i", 4
|
||||||
TYPE_UINT = b"I"
|
TYPE_UINT = b"I", 4
|
||||||
TYPE_LONG = b"l"
|
TYPE_LONG = b"l", 4
|
||||||
TYPE_ULONG = b"L"
|
TYPE_ULONG = b"L", 4
|
||||||
TYPE_LLONG = b"q"
|
TYPE_LLONG = b"q", 8
|
||||||
TYPE_ULLONG = b"Q"
|
TYPE_ULLONG = b"Q", 8
|
||||||
TYPE_FLOAT = b"f"
|
TYPE_FLOAT = b"f", 4
|
||||||
TYPE_DOUBLE = b"d"
|
TYPE_DOUBLE = b"d", 8
|
||||||
TYPE_STRING = b"B%ds"
|
TYPE_STRING = b"s", 1
|
||||||
TYPE_BYTESTREAM = b"%ds"
|
|
||||||
|
TYPE_ULEB = b"s", 1, True # True just so they are different tuples
|
||||||
|
|
||||||
BYTEORDER_NATIVE = b"@"
|
BYTEORDER_NATIVE = b"@"
|
||||||
BYTEORDER_LITTLE = b"<"
|
BYTEORDER_LITTLE = b"<"
|
||||||
|
@ -22,7 +23,6 @@ BYTEORDER_BIG = b">"
|
||||||
|
|
||||||
def encode_uleb(val):
|
def encode_uleb(val):
|
||||||
data = b""
|
data = b""
|
||||||
pos = 0
|
|
||||||
while val != 0:
|
while val != 0:
|
||||||
b = val & 0x7f
|
b = val & 0x7f
|
||||||
val >>= 7
|
val >>= 7
|
||||||
|
@ -73,10 +73,18 @@ def guess_type(value):
|
||||||
|
|
||||||
raise Exception("Unable to guess type")
|
raise Exception("Unable to guess type")
|
||||||
|
|
||||||
class Serializable:
|
def read(value_type, stream):
|
||||||
post_serialized = None
|
#if value_type == TYPE_STRING:
|
||||||
|
# read()
|
||||||
|
|
||||||
def __init__(self, value, type = None, post = None):
|
struct.unpack(value_type[0], stream[:value_type[1]])
|
||||||
|
|
||||||
|
class Serializable:
|
||||||
|
length = None
|
||||||
|
prefix = b""
|
||||||
|
filter = None
|
||||||
|
|
||||||
|
def __init__(self, value, type = None, length = None, prefix = b"", filter = None):
|
||||||
self.value = value
|
self.value = value
|
||||||
|
|
||||||
if type is not None:
|
if type is not None:
|
||||||
|
@ -84,29 +92,76 @@ class Serializable:
|
||||||
else:
|
else:
|
||||||
self.type = guess_type(type)
|
self.type = guess_type(type)
|
||||||
|
|
||||||
self.post_serialized = post
|
self.length = length
|
||||||
|
self.prefix = prefix
|
||||||
|
self.filter = filter
|
||||||
|
|
||||||
def get_struct_fmt(self):
|
def get_struct_fmt(self, length = 1, uleb_length_override = -1):
|
||||||
if self.type in [TYPE_STRING, TYPE_BYTESTREAM]:
|
struct_fmt = self.type[0]
|
||||||
return self.type % len(self.get_value()[-1])
|
if length > 1:
|
||||||
return self.type
|
struct_fmt = b"%d%b" % (length, self.type[0])
|
||||||
|
if self.length is not None:
|
||||||
|
if self.length == TYPE_ULEB:
|
||||||
|
struct_fmt = b"%d%b%b" % (len(self.get_value()[0]) if uleb_length_override == -1 else uleb_length_override, self.length[0], struct_fmt)
|
||||||
|
else:
|
||||||
|
struct_fmt = b"%b%b" % (self.length[0], struct_fmt)
|
||||||
|
return struct_fmt
|
||||||
|
|
||||||
def get_value(self):
|
def get_value(self):
|
||||||
def _get_value():
|
val = [self.value]
|
||||||
if self.type == TYPE_STRING:
|
if self.filter is not None:
|
||||||
return [0x0b, encode_uleb(len(self.value)) + self.value]
|
val[-1] = self.filter(val[-1], 0)
|
||||||
return [self.value]
|
if self.length is not None:
|
||||||
|
if self.length == TYPE_ULEB:
|
||||||
val = _get_value()
|
val.insert(0, encode_uleb(len(val[-1])))
|
||||||
if self.post_serialized is not None:
|
else:
|
||||||
val[-1] = self.post_serialized(val[-1])
|
val.insert(0, len(val[-1]))
|
||||||
|
|
||||||
return val
|
return val
|
||||||
|
|
||||||
def pack(self, byte_order = BYTEORDER_LITTLE):
|
def pack(self, byte_order = BYTEORDER_LITTLE):
|
||||||
struct_fmt = byte_order + self.get_struct_fmt()
|
length = 1
|
||||||
#print(struct_fmt, self.get_value())
|
if self.type == TYPE_STRING:
|
||||||
return struct.pack(struct_fmt, *self.get_value())
|
length = len(self.get_value()[-1])
|
||||||
|
struct_fmt = byte_order + self.get_struct_fmt(length)
|
||||||
|
packed = struct.pack(struct_fmt, *self.get_value())
|
||||||
|
if self.prefix is not None:
|
||||||
|
packed = self.prefix + packed
|
||||||
|
return packed
|
||||||
|
|
||||||
|
def unpack(self, stream, byte_order = BYTEORDER_LITTLE):
|
||||||
|
length = 1
|
||||||
|
read_pos = 0
|
||||||
|
extra_read = 0
|
||||||
|
remove_struct_fmt = []
|
||||||
|
if self.type == TYPE_STRING:
|
||||||
|
if self.length == TYPE_ULEB:
|
||||||
|
extra_read = 1
|
||||||
|
read_pos += 1 # Skip 0x0b
|
||||||
|
length, _read_pos = decode_uleb(stream[read_pos:])
|
||||||
|
read_pos += _read_pos - 1
|
||||||
|
else:
|
||||||
|
l = Serializable(0, self.length)
|
||||||
|
read_pos = l.unpack(stream)
|
||||||
|
length = l.get_value()[-1]
|
||||||
|
remove_struct_fmt.append(0)
|
||||||
|
|
||||||
|
|
||||||
|
struct_fmt = byte_order + self.get_struct_fmt(length, read_pos)
|
||||||
|
length += extra_read
|
||||||
|
|
||||||
|
if len(remove_struct_fmt):
|
||||||
|
for rem in remove_struct_fmt:
|
||||||
|
struct_fmt = struct_fmt[:rem + 1] + struct_fmt[rem + 2:]
|
||||||
|
|
||||||
|
data = struct.unpack(struct_fmt, stream[read_pos:read_pos + self.type[1] * length])
|
||||||
|
|
||||||
|
if self.filter is not None:
|
||||||
|
data = list(data)
|
||||||
|
data[-1] = self.filter(data[-1], 1)
|
||||||
|
|
||||||
|
self.value = data[-1]
|
||||||
|
|
||||||
|
return read_pos + self.type[1] * length
|
||||||
|
|
||||||
class Serializer:
|
class Serializer:
|
||||||
def __init__(self, byte_order = BYTEORDER_LITTLE):
|
def __init__(self, byte_order = BYTEORDER_LITTLE):
|
||||||
|
@ -119,6 +174,13 @@ class Serializer:
|
||||||
else:
|
else:
|
||||||
self.stack.append(serializable)
|
self.stack.append(serializable)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
return b"".join(s.pack() for s in self.stack)
|
||||||
|
|
||||||
|
def deserialize(self, stream):
|
||||||
|
for s in self.stack:
|
||||||
|
stream = stream[s.unpack(stream):]
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
return b"".join(s.pack() for s in self.stack)
|
return b"".join(s.pack() for s in self.stack)
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,8 @@
|
||||||
import struct
|
import struct
|
||||||
import string
|
import string
|
||||||
#import lzma
|
import lzma
|
||||||
import pylzma # Required cause pythons standard library lzma has missing support
|
import pylzma # Required cause pythons standard library lzma has missing support
|
||||||
|
|
||||||
from collections import OrderedDict
|
|
||||||
|
|
||||||
from .frame import ReplayFrame
|
from .frame import ReplayFrame
|
||||||
|
|
||||||
from .helpers import osuModes
|
from .helpers import osuModes
|
||||||
|
@ -20,10 +18,8 @@ from hashlib import md5 as _md5
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
def md5_str(data):
|
def md5_str(data):
|
||||||
print(data)
|
|
||||||
if type(data) is str:
|
if type(data) is str:
|
||||||
data = data.encode("ascii")
|
data = data.encode("ascii")
|
||||||
print(_md5(data).hexdigest().encode())
|
|
||||||
return _md5(data).hexdigest().encode()
|
return _md5(data).hexdigest().encode()
|
||||||
|
|
||||||
def md5_file(file):
|
def md5_file(file):
|
||||||
|
@ -36,43 +32,56 @@ def md5_file(file):
|
||||||
hash = _md5()
|
hash = _md5()
|
||||||
for chunk in iter(lambda: file.read(4096), b""):
|
for chunk in iter(lambda: file.read(4096), b""):
|
||||||
hash.update(chunk)
|
hash.update(chunk)
|
||||||
return hash.hexdigest().encode()
|
return hash.hexdigest()
|
||||||
|
|
||||||
def append_and_compress(data):
|
def append_and_compress(data, mode):
|
||||||
return lzma_compress(data + b"-12345|0|0|1337,")
|
if mode == 0:
|
||||||
|
return lzma_compress(data + b"-12345|0|0|1337,")
|
||||||
|
else:
|
||||||
|
return lzma_decompress(data)[:-len(b"-12345|0|0|1337,")]
|
||||||
|
|
||||||
def lzma_compress(data):
|
def lzma_compress(data):
|
||||||
comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False)
|
comp = pylzma.compress(data, dictionary = 21, fastBytes = 255, eos = False)
|
||||||
comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length
|
comp = comp[:5] + struct.pack(b"<Q", len(data)) + comp[5:] # Append uncompressed length
|
||||||
return struct.pack("<I", len(comp)) + comp # For some odd ass reason this doesnt use ULEB
|
return comp
|
||||||
|
|
||||||
|
def lzma_decompress(data):
|
||||||
|
comp = lzma.decompress(data)
|
||||||
|
return comp
|
||||||
|
|
||||||
class Replay:
|
class Replay:
|
||||||
structure = OrderedDict([
|
_mode = Serializable(osuModes.STANDARD, TYPE_BYTE)
|
||||||
("mode", Serializable(osuModes.STANDARD, TYPE_BYTE)),
|
_osu_version = Serializable(20181216, TYPE_INT)
|
||||||
("osu_version", Serializable(20181216, TYPE_INT)),
|
#_beatmap_hash_length = Serializable(32, TYPE_ULEB)
|
||||||
("beatmap_hash",Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)),
|
_beatmap_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
|
||||||
("player_name", Serializable(b"osu!", TYPE_STRING)),
|
#_player_name_length = Serializable(4, TYPE_ULEB)
|
||||||
("score_hash", Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)),
|
_player_name = Serializable(b"osu!", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
|
||||||
|
#_score_hash_length = Serializable(32, TYPE_ULEB)
|
||||||
|
_score_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
|
||||||
|
|
||||||
("score_300s", Serializable(0, TYPE_USHORT)),
|
_score_300s = Serializable(0, TYPE_USHORT)
|
||||||
("score_100s", Serializable(0, TYPE_USHORT)),
|
_score_100s = Serializable(0, TYPE_USHORT)
|
||||||
("score_50s", Serializable(0, TYPE_USHORT)),
|
_score_50s = Serializable(0, TYPE_USHORT)
|
||||||
("score_gekis", Serializable(0, TYPE_USHORT)),
|
_score_gekis = Serializable(0, TYPE_USHORT)
|
||||||
("score_katus", Serializable(0, TYPE_USHORT)),
|
_score_katus = Serializable(0, TYPE_USHORT)
|
||||||
("score_miss", Serializable(0, TYPE_USHORT)),
|
_score_miss = Serializable(0, TYPE_USHORT)
|
||||||
("score", Serializable(0, TYPE_INT)),
|
_score = Serializable(0, TYPE_INT)
|
||||||
("combo", Serializable(0, TYPE_USHORT)),
|
_combo = Serializable(0, TYPE_USHORT)
|
||||||
("perfect", Serializable(True, TYPE_BOOL)),
|
_perfect = Serializable(True, TYPE_BOOL)
|
||||||
|
|
||||||
("mods", Serializable(osuMods.NOMOD, TYPE_INT)),
|
_mods = Serializable(osuMods.NOMOD, TYPE_INT)
|
||||||
|
|
||||||
("lifebar_graph", Serializable(b"0|1,", TYPE_STRING)),
|
#_lifebar_graph_length = Serializable(4, TYPE_ULEB)
|
||||||
("timestamp", Serializable(0, TYPE_ULLONG)),
|
_lifebar_graph = Serializable(b"0|1,", TYPE_STRING, length = TYPE_ULEB, prefix = b"\x0b")
|
||||||
|
_timestamp = Serializable(0, TYPE_ULLONG)
|
||||||
|
|
||||||
("replay_data", Serializable(b"", TYPE_BYTESTREAM, post = append_and_compress)),
|
#_replay_data_length = Serializable(0, TYPE_UINT)
|
||||||
|
_replay_data = Serializable(b"", TYPE_STRING, length = TYPE_UINT, filter = append_and_compress)
|
||||||
|
|
||||||
("online_score_id", Serializable(0, TYPE_ULLONG))
|
_online_score_id = Serializable(0, TYPE_ULLONG)
|
||||||
])
|
|
||||||
|
# Marker (Only used as attribute section marker for auto-serializing)
|
||||||
|
_end_of_attributes = None
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
allowed_kwargs = {
|
allowed_kwargs = {
|
||||||
|
@ -93,12 +102,12 @@ class Replay:
|
||||||
def set_mode(self, mode):
|
def set_mode(self, mode):
|
||||||
if mode > 3 or mode < 0:
|
if mode > 3 or mode < 0:
|
||||||
raise Exception("Invalid mode")
|
raise Exception("Invalid mode")
|
||||||
self.structure["mode"].value = mode
|
self._mode.value = mode
|
||||||
|
|
||||||
def set_osu_version(self, osu_version):
|
def set_osu_version(self, osu_version):
|
||||||
if type(osu_version) is not int:
|
if type(osu_version) is not int:
|
||||||
raise Exception("osu! version must be an int")
|
raise Exception("osu! version must be an int")
|
||||||
self.structure["osu_version"].value = osu_version
|
self._osu_version.value = osu_version
|
||||||
|
|
||||||
def set_beatmap_hash(self, md5_hash):
|
def set_beatmap_hash(self, md5_hash):
|
||||||
if type(md5_hash) is bytes:
|
if type(md5_hash) is bytes:
|
||||||
|
@ -106,49 +115,49 @@ class Replay:
|
||||||
|
|
||||||
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
||||||
raise Exception("Invalid beatmap hash")
|
raise Exception("Invalid beatmap hash")
|
||||||
self.structure["beatmap_hash"].value = md5_hash.encode()
|
self._beatmap_hash.value = md5_hash.encode()
|
||||||
|
|
||||||
def set_beatmap_file(self, filepath):
|
def set_beatmap_file(self, filepath):
|
||||||
if not isfile(filepath):
|
if not isfile(filepath):
|
||||||
raise Exception("Beatmap file not found")
|
raise Exception("Beatmap file not found")
|
||||||
with open(filepath, "rb") as f:
|
with open(filepath, "rb") as f:
|
||||||
self.structure["beatmap_hash"].value = md5_file(f)
|
self._beatmap_hash.value = md5_file(f)
|
||||||
|
|
||||||
def set_player_name(self, player_name):
|
def set_player_name(self, player_name):
|
||||||
if type(player_name) is str:
|
if type(player_name) is str:
|
||||||
player_name = player_name.encode()
|
player_name = player_name.encode()
|
||||||
self.structure["player_name"].value = player_name
|
self._player_name.value = player_name
|
||||||
|
|
||||||
def set_score_hash(self, md5_hash):
|
def set_score_hash(self, md5_hash):
|
||||||
if type(md5_hash) is bytes:
|
if type(md5_hash) is bytes:
|
||||||
md5_hash = md5_hash.decode()
|
md5_hash = md5_hash.decode()
|
||||||
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
|
||||||
raise Exception("Invalid replay hash")
|
raise Exception("Invalid replay hash")
|
||||||
self.structure["score_hash"].value = md5_hash.encode()
|
self._score_hash.value = md5_hash.encode()
|
||||||
|
|
||||||
def set_score(self, score = None, combo = None, s300 = None, s100 = None, s50 = None, sgekis = None, skatus = None, miss = None):
|
def set_score(self, score = None, combo = None, s300 = None, s100 = None, s50 = None, sgekis = None, skatus = None, miss = None):
|
||||||
if type(score) is int: self.structure["score"].value = score
|
if type(score) is int: self._score.value = score
|
||||||
if type(combo) is int: self.structure["combo"].value = combo
|
if type(combo) is int: self._combo.value = combo
|
||||||
if type(s300) is int: self.structure["score_300s"].value = s300
|
if type(s300) is int: self._score_300s.value = s300
|
||||||
if type(s100) is int: self.structure["score_100s"].value = s100
|
if type(s100) is int: self._score_100s.value = s100
|
||||||
if type(s50) is int: self.structure["score_50s"].value = s50
|
if type(s50) is int: self._score_50s.value = s50
|
||||||
if type(sgekis) is int: self.structure["score_gekis"].value = sgekis
|
if type(sgekis) is int: self._score_gekis.value = sgekis
|
||||||
if type(skatus) is int: self.structure["score_katus"].value = skatus
|
if type(skatus) is int: self._score_katus.value = skatus
|
||||||
if type(miss) is int: self.structure["score_miss"].value = miss
|
if type(miss) is int: self._score_miss.value = miss
|
||||||
|
|
||||||
def set_mods(self, mods):
|
def set_mods(self, mods):
|
||||||
if mods < 0 or mods > (1 << 30) - 1:
|
if mods < 0 or mods > (1 << 30) - 1:
|
||||||
raise Exception("Mods are out of range")
|
raise Exception("Mods are out of range")
|
||||||
self.structure["mods"].value = mods
|
self._mods.value = mods
|
||||||
|
|
||||||
def set_lifebar_graph(self, graph):
|
def set_lifebar_graph(self, graph):
|
||||||
t_graph = type(graph)
|
t_graph = type(graph)
|
||||||
if t_graph is list:
|
if t_graph is list:
|
||||||
self.structure["lifebar_graph"].value = b"".join(graph)
|
self._lifebar_graph.value = b"".join(graph)
|
||||||
elif t_graph is str:
|
elif t_graph is str:
|
||||||
self.structure["lifebar_graph"].value = graph.encode()
|
self._lifebar_graph.value = graph.encode()
|
||||||
elif t_graph is bytes:
|
elif t_graph is bytes:
|
||||||
self.structure["lifebar_graph"].value = graph
|
self._lifebar_graph.value = graph
|
||||||
else:
|
else:
|
||||||
raise Exception("Invalid lifebar data")
|
raise Exception("Invalid lifebar data")
|
||||||
|
|
||||||
|
@ -157,23 +166,23 @@ class Replay:
|
||||||
timestamp += 62135599380000 # offset
|
timestamp += 62135599380000 # offset
|
||||||
timestamp *= 10 ** 4
|
timestamp *= 10 ** 4
|
||||||
|
|
||||||
self.structure["timestamp"].value = timestamp
|
self._timestamp.value = timestamp
|
||||||
|
|
||||||
def set_online_score_id(self, online_score_id):
|
def set_online_score_id(self, online_score_id):
|
||||||
self.structure["online_score_id"].value = online_score_id
|
self._online_score_id.value = online_score_id
|
||||||
|
|
||||||
# --------------------------------------------------------------------------
|
# --------------------------------------------------------------------------
|
||||||
# Update variables ---------------------------------------------------------
|
# Update variables ---------------------------------------------------------
|
||||||
|
|
||||||
def update_perfect(self):
|
def update_perfect(self):
|
||||||
m = [self.structure["score_100s"].value, self.structure["score_50s"].value, self.structure["score_katus"].value, self.structure["score_miss"].value]
|
m = [self._score_100s.value, self._score_50s.value, self._score_katus.value, self._score_miss.value]
|
||||||
self.structure["perfect"].value = sum(m) == 0
|
self._perfect.value = sum(m) == 0
|
||||||
|
|
||||||
def update_score_hash(self):
|
def update_score_hash(self):
|
||||||
self.set_score_hash(
|
self.set_score_hash(
|
||||||
md5_str(b"%d%b%b%b%d%b" % (
|
md5_str(b"%d%b%b%b%d%b" % (
|
||||||
self.structure["combo"].value, b"osu", self.structure["player_name"].value,
|
self._combo.value, b"osu", self._player_name.value,
|
||||||
self.structure["beatmap_hash"].value, self.structure["score"].value, self.get_rank()
|
self._beatmap_hash.value, self._score.value, self.get_rank()
|
||||||
))
|
))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -185,10 +194,10 @@ class Replay:
|
||||||
# Get / Helpers ------------------------------------------------------------
|
# Get / Helpers ------------------------------------------------------------
|
||||||
|
|
||||||
def get_hits(self):
|
def get_hits(self):
|
||||||
return sum([self.structure["score_300s"].value, self.structure["score_100s"].value, self.structure["score_50s"].value])
|
return sum([self._score_300s.value, self._score_100s.value, self._score_50s.value])
|
||||||
|
|
||||||
def get_possible_hits(self):
|
def get_possible_hits(self):
|
||||||
return self.get_hits() + self.structure["score_miss"].value
|
return self.get_hits() + self._score_miss.value
|
||||||
|
|
||||||
def get_rank(self):
|
def get_rank(self):
|
||||||
hits = self.get_possible_hits()
|
hits = self.get_possible_hits()
|
||||||
|
@ -196,17 +205,17 @@ class Replay:
|
||||||
print("Can not calculate rank without any score data (Defaulting to Fail)")
|
print("Can not calculate rank without any score data (Defaulting to Fail)")
|
||||||
return osuRanks.F
|
return osuRanks.F
|
||||||
|
|
||||||
r300 = self.structure["score_300s"].value / hits
|
r300 = self._score_300s.value / hits
|
||||||
r50 = self.structure["score_50s"].value / hits
|
r50 = self._score_50s.value / hits
|
||||||
h = osuMods.any_enabled(self.structure["mods"].value, osuMods.HIDDEN | osuMods.FLASHLIGHT)
|
h = osuMods.any_enabled(self._mods.value, osuMods.HIDDEN | osuMods.FLASHLIGHT)
|
||||||
|
|
||||||
if r300 == 1:
|
if r300 == 1:
|
||||||
return osuRanks.SSH if h else osuRanks.SS
|
return osuRanks.SSH if h else osuRanks.SS
|
||||||
if r300 > .9 and r50 <= .01 and self.structure["score_miss"].value == 0:
|
if r300 > .9 and r50 <= .01 and self._score_miss.value == 0:
|
||||||
return osuRanks.SH if h else osuRanks.S
|
return osuRanks.SH if h else osuRanks.S
|
||||||
if r300 > .8 and self.structure["score_miss"].value == 0 or r300 > .9:
|
if r300 > .8 and self._score_miss.value == 0 or r300 > .9:
|
||||||
return osuRanks.A
|
return osuRanks.A
|
||||||
if r300 > .7 and self.structure["score_miss"].value == 0 or r300 > .8:
|
if r300 > .7 and self._score_miss.value == 0 or r300 > .8:
|
||||||
return osuRanks.B
|
return osuRanks.B
|
||||||
if r300 > .6:
|
if r300 > .6:
|
||||||
return osuRanks.C
|
return osuRanks.C
|
||||||
|
@ -222,11 +231,11 @@ class Replay:
|
||||||
self.write(_frame)
|
self.write(_frame)
|
||||||
return
|
return
|
||||||
if t_frame is ReplayFrame:
|
if t_frame is ReplayFrame:
|
||||||
self.structure["replay_data"].value += bytes(frame)
|
self._replay_data.value += bytes(frame)
|
||||||
elif t_frame is bytes:
|
elif t_frame is bytes:
|
||||||
self.structure["replay_data"].value += frame
|
self._replay_data.value += frame
|
||||||
elif t_frame is str:
|
elif t_frame is str:
|
||||||
self.structure["replay_data"].value += frame.encode()
|
self._replay_data.value += frame.encode()
|
||||||
else:
|
else:
|
||||||
raise Exception("Invalid frame data")
|
raise Exception("Invalid frame data")
|
||||||
|
|
||||||
|
@ -238,7 +247,25 @@ class Replay:
|
||||||
|
|
||||||
serializer = Serializer()
|
serializer = Serializer()
|
||||||
|
|
||||||
[ serializer.add( struc ) for struc in self.structure.values() ] # Add values to serializer
|
attribs = [ a for a in self.__dir__() if not a.startswith("__") ]
|
||||||
|
for attrib in attribs:
|
||||||
|
if attrib == "_end_of_attributes":
|
||||||
|
break
|
||||||
|
|
||||||
|
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
|
||||||
|
|
||||||
with open(filename, "wb") as f:
|
with open(filename, "wb") as f:
|
||||||
f.write( serializer.flush() )
|
f.write( serializer.serialize() )
|
||||||
|
|
||||||
|
def load(self, filename):
|
||||||
|
serializer = Serializer()
|
||||||
|
|
||||||
|
attribs = [ a for a in self.__dir__() if not a.startswith("__") ]
|
||||||
|
for attrib in attribs:
|
||||||
|
if attrib == "_end_of_attributes":
|
||||||
|
break
|
||||||
|
|
||||||
|
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
|
||||||
|
|
||||||
|
with open(filename, "rb") as f:
|
||||||
|
serializer.deserialize( f.read() )
|
||||||
|
|
15
test.py
15
test.py
|
@ -3,17 +3,8 @@ from osuRepy.replay import Replay # Replay instance
|
||||||
|
|
||||||
from osuRepy.helpers import osuButtons, osuMods # Enum helpers
|
from osuRepy.helpers import osuButtons, osuMods # Enum helpers
|
||||||
|
|
||||||
replay = Replay(beatmap_hash = "9e66c0a0eadced7d07f06b3968a74cc0") # Create replay instance
|
replay = Replay() # Create replay instance
|
||||||
|
|
||||||
replay.write( ReplayFrame(2, 256, 192, osuButtons.M1) ) # Add replay frame (time, x, y, buttons)
|
replay.load("replay.osr")
|
||||||
|
|
||||||
replay.set_score(score = 29284624,
|
replay.save("test.osr")
|
||||||
s300 = 416,
|
|
||||||
s50 = 2,
|
|
||||||
miss = 1,
|
|
||||||
combo = 358) # Set score info
|
|
||||||
|
|
||||||
replay.set_mods(osuMods.HIDDEN | osuMods.DOUBLETIME) # Enable HDDT mods
|
|
||||||
replay.set_timestamp(636880715111611126, False)
|
|
||||||
|
|
||||||
replay.save("myReplay.osr") # Export replay to file
|
|
Loading…
Reference in New Issue
Block a user