type rewrite

This commit is contained in:
Emily 2019-03-13 09:07:46 +01:00
parent a2a7ca2ffb
commit 677ece2cb9
5 changed files with 178 additions and 72 deletions

View File

@ -27,4 +27,4 @@ class ReplayFrame:
return "%s|%s|%s|%s," % (self.delta, self.x, self.y, self.buttons)
def __bytes__(self):
return str(self).encode()
return str(self).encode()

View File

@ -2,3 +2,5 @@ from . import osuModes
from . import osuMods
from . import osuRanks
from . import osuButtons
from . import typeSerializer

View File

@ -0,0 +1,109 @@
import struct
TYPE_CHAR = b"b"
TYPE_BYTE = b"B"
TYPE_BOOL = b"?"
TYPE_SHORT = b"h"
TYPE_USHORT = b"H"
TYPE_INT = b"i"
TYPE_UINT = b"I"
TYPE_LONG = b"l"
TYPE_ULONG = b"L"
TYPE_LLONG = b"q"
TYPE_ULLONG = b"Q"
TYPE_FLOAT = b"f"
TYPE_DOUBLE = b"d"
TYPE_STRING = b"B%ds"
TYPE_BYTESTREAM = b"%ds"
BYTEORDER_NATIVE = b"@"
BYTEORDER_LITTLE = b"<"
BYTEORDER_BIG = b">"
def guess_type(value):
t_value = type(value)
if t_value is bool:
return TYPE_BOOL
if t_value is float:
return TYPE_FLOAT
if t_value is str:
return TYPE_STRING
if t_value is bytes:
return TYPE_STRING
if t_value is int:
bit_len = 0
while value:
value >>= 1
bit_len += 1
byte_len = (bit_len + 1) // 2
if byte_len <= 1:
return TYPE_BYTE
if byte_len <= 2:
return TYPE_USHORT
if byte_len <= 4:
return TYPE_UINT
if byte_len <= 8:
return TYPE_ULLONG
raise Exception("Unable to guess type")
class Serializable:
post_serialized = None
def __init__(self, value, type = None, post = None):
self.value = value
if type is not None:
self.type = type
else:
self.type = guess_type(type)
self.post_serialized = post
def get_struct_fmt(self):
if self.type in [TYPE_STRING, TYPE_BYTESTREAM]:
return self.type % len(self.get_value()[-1])
return self.type
def get_value(self):
def _get_value():
if self.type == TYPE_STRING:
return [11, bytes([len(self.value)]) + self.value]
return [self.value]
val = _get_value()
if self.post_serialized is not None:
val[-1] = self.post_serialized(val[-1])
return val
def pack(self, byte_order = BYTEORDER_LITTLE):
struct_fmt = byte_order + self.get_struct_fmt()
print(struct_fmt, self.get_value())
return struct.pack(struct_fmt, *self.get_value())
class Serializer:
def __init__(self, byte_order = BYTEORDER_LITTLE):
self.stack = []
self.byte_order = byte_order
def add(self, serializable):
if type(serializable) is list:
self.stack += serializable
else:
self.stack.append(serializable)
def flush(self):
return b"".join(s.pack() for s in self.stack)
"""
struct_fmt = self.byte_order + b"".join( s.get_struct_fmt() for s in self.stack )
print(struct_fmt)
print([ s.get_value() for s in self.stack ])
return struct.pack(struct_fmt, *[ s.get_value() for s in self.stack ])
"""
def __bytes__(self):
return self.flush()

View File

@ -6,6 +6,8 @@ from .helpers import osuModes
from .helpers import osuMods
from .helpers import osuRanks
from .helpers.typeSerializer import *
from io import BufferedReader
from os.path import isfile
from hashlib import md5 as _md5
@ -27,31 +29,43 @@ def md5_file(file):
hash.update(chunk)
return hash.hexdigest()
def lzma_compress(data):
return bytes([len(data)]) + lzma.compress(data + b"-12345|0|0|1337,", format = lzma.FORMAT_ALONE, filters = [
{
"id": lzma.FILTER_LZMA1,
"preset": lzma.PRESET_DEFAULT,
"dict_size": 1 << 21
}
])
class Replay:
mode = osuModes.STANDARD # Byte
osu_version = 20131216 # Int
beatmap_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[]
player_name = b"osu!" # Byte[]
score_hash = b"d41d8cd98f00b204e9800998ecf8427e" # Byte[]
mode = Serializable(osuModes.STANDARD, TYPE_BYTE)
osu_version = Serializable(20131216, TYPE_INT)
beatmap_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)
player_name = Serializable(b"osu!", TYPE_STRING)
score_hash = Serializable(b"d41d8cd98f00b204e9800998ecf8427e", TYPE_STRING)
score_300s = 0 # uShort
score_100s = 0 # uShort
score_50s = 0 # uShort
score_gekis = 0 # uShort
score_katus = 0 # uShort
score_miss = 0 # uShort
score = 0 # Int
combo = 0 # uShort
perfect = True # Byte
score_300s = Serializable(0, TYPE_USHORT)
score_100s = Serializable(0, TYPE_USHORT)
score_50s = Serializable(0, TYPE_USHORT)
score_gekis = Serializable(0, TYPE_USHORT)
score_katus = Serializable(0, TYPE_USHORT)
score_miss = Serializable(0, TYPE_USHORT)
score = Serializable(0, TYPE_INT)
combo = Serializable(0, TYPE_USHORT)
perfect = Serializable(True, TYPE_BOOL)
mods = osuMods.NOMOD # Int
mods = Serializable(osuMods.NOMOD, TYPE_INT)
lifebar_graph = b"0|1," # Byte[]
timestamp = 0 # Long
lifebar_graph = Serializable(b"0|1,", TYPE_STRING)
timestamp = Serializable(0, TYPE_ULLONG)
replay_data = b"" # Byte[]
replay_data = Serializable(b"", TYPE_BYTESTREAM, post = lzma_compress)
online_score_id = 0 # Long
online_score_id = Serializable(0, TYPE_ULLONG)
# Marker (Only used as attribute section marker for auto-serializing)
end_of_attributes = None
def __init__(self, **kwargs):
allowed_kwargs = {
@ -67,54 +81,54 @@ class Replay:
# Set validators -----------------------------------------------------------
def set_mode(self, mode_id):
if mode_id > 3 or mode_id < 0:
def set_mode(self, mode):
if mode > 3 or mode < 0:
raise Exception("Invalid mode")
self.mode_id = mode_id
self.mode.value = mode
def set_osu_version(self, osu_version):
if type(osu_version) is not int:
raise Exception("osu! version must be an int")
self.osu_version = osu_version
self.osu_version.value = osu_version
def set_beatmap_hash(self, md5_hash):
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid beatmap hash")
self.beatmap_hash = md5_hash
self.beatmap_hash.value = md5_hash
def set_beatmap(self, filepath):
if not isfile(filepath):
raise Exception("Beatmap file not found")
with open(filepath, "rb") as f:
self.beatmap_hash = md5_file(f)
self.beatmap_hash.value = md5_file(f)
def set_player_name(self, player_name):
self.player_name = player_name
self.player_name.value = player_name
def set_score_hash(self, md5_hash):
if type(md5_hash) is bytes:
md5_hash = md5_hash.decode()
if len(md5_hash) != 32 or len([x for x in md5_hash if x in string.hexdigits]) != 32:
raise Exception("Invalid replay hash")
self.score_hash = md5_hash.encode()
self.score_hash.value = md5_hash.encode()
def set_mods(self, mods):
if mods < 0 or mods > (1 << 30) - 1:
raise Exception("Mods are out of range")
self.mods = mods
self.mods.value = mods
# --------------------------------------------------------------------------
# Update variables ---------------------------------------------------------
def update_perfect(self):
m = [self.score_100s, self.score_50s, self.score_katus, self.score_miss]
self.perfect = sum(m) == 0
m = [self.score_100s.value, self.score_50s.value, self.score_katus.value, self.score_miss.value]
self.perfect.value = sum(m) == 0
def update_score_hash(self):
self.set_score_hash(
md5_str("%d%s%s%s%d%d" % (
self.combo, "osu", self.player_name,
self.beatmap_hash, self.score, self.get_rank()
self.combo.value, "osu", self.player_name.value,
self.beatmap_hash.value, self.score.value, self.get_rank()
))
)
@ -126,27 +140,27 @@ class Replay:
# Get / Helpers ------------------------------------------------------------
def get_hits(self):
return sum([self.score_300s, self.score_100s, self.score_50s])
return sum([self.score_300s.value, self.score_100s.value, self.score_50s.value])
def get_possible_hits(self):
return self.get_hits() + self.score_miss
return self.get_hits() + self.score_miss.value
def get_rank(self): # We dont give out F ranks around here
hits = self.get_possible_hits()
if hits == 0:
raise Exception("Can not calculate rank without any score data")
r300 = self.score_300s / hits
r50 = self.score_50s / hits
h = osuMods.any_enabled(self.mods, osuMods.HIDDEN | osuMods.FLASHLIGHT)
r300 = self.score_300s.value / hits
r50 = self.score_50s.value / hits
h = osuMods.any_enabled(self.mods.value, osuMods.HIDDEN | osuMods.FLASHLIGHT)
if r300 == 1:
return osuRanks.SSH if h else osuRanks.SS
if r300 > .9 and r50 <= .01 and self.score_miss == 0:
if r300 > .9 and r50 <= .01 and self.score_miss.value == 0:
return osuRanks.SH if h else osuRanks.S
if r300 > .8 and self.score_miss == 0 or r300 > .9:
if r300 > .8 and self.score_miss.value == 0 or r300 > .9:
return osuRanks.A
if r300 > .7 and self.score_miss == 0 or r300 > .8:
if r300 > .7 and self.score_miss.value == 0 or r300 > .8:
return osuRanks.B
if r300 > .6:
return osuRanks.C
@ -161,7 +175,7 @@ class Replay:
self.write(_frame)
return
self.replay_data += bytes(frame)
self.replay_data.value += bytes(frame)
# --------------------------------------------------------------------------
# IO replay data -----------------------------------------------------------
@ -169,34 +183,15 @@ class Replay:
def save(self, filename):
self.update()
compressed_replay = lzma.compress(self.replay_data + b"-12345|0|0|1337,", format = lzma.FORMAT_ALONE, filters = [
{
"id": lzma.FILTER_LZMA1,
"preset": lzma.PRESET_DEFAULT,
"dict_size": 1 << 21, # !important
}
])
serializer = Serializer()
_player_name = bytes([len(self.player_name)]) + self.player_name
_lifebar_graph = bytes([len(self.lifebar_graph)]) + self.lifebar_graph
_compressed_replay = bytes([len(compressed_replay)]) + compressed_replay
attribs = [ a for a in self.__dir__() if not a.startswith("__") ]
for attrib in attribs:
if attrib == "end_of_attributes":
break
serializer.add( self.__getattribute__(attrib) ) # Add value to serializer
with open(filename, "wb") as f:
# Yes... Please never do this
data = struct.pack(b"<BiBB32sB%dsBB32s6HiH?iB%dsQ%dsQ" % (len(_player_name), len(_lifebar_graph), len(_compressed_replay)),
self.mode,
self.osu_version,
11, len(self.beatmap_hash), self.beatmap_hash,
11, _player_name,
11, len(self.score_hash), self.score_hash,
self.score_300s, self.score_100s, self.score_50s, self.score_gekis, self.score_katus, self.score_miss,
self.score,
self.combo,
self.perfect,
self.mods,
11, _lifebar_graph,
self.timestamp,
_compressed_replay,
self.online_score_id
)
f.write(data)
f.write( serializer.flush() )
print(serializer.flush())

View File

@ -4,7 +4,7 @@ from osuRepy.replay import Replay
replay = Replay()
replay.write( ReplayFrame(2, 3, 4, 4) )
replay.score_300s = 20
replay.timestamp = 635592412124124124 # Wed Feb 11 2015 09:03:52 GMT+0100 (Central European Standard Time)
replay.score_300s.value = 20
replay.timestamp.value = 635592412124124124 # Wed Feb 11 2015 09:03:52 GMT+0100 (Central European Standard Time)
replay.save("test.osr")