sampy/test.py

128 lines
3.6 KiB
Python
Raw Permalink Normal View History

2020-04-04 16:04:40 +02:00
from sampy.raknet.bitstream import Bitstream
from sampy.client.player import PlayerClient
2020-04-04 16:04:40 +02:00
import struct
class Packet:
def __init__(self, data: bytes, is_client: bool):
self.data = data
self.is_client = is_client
self.bitstream = Bitstream(bytearray.fromhex(self.data))
PACKETS = [
Packet("18 69 69", True),
Packet("1a 18 be", False),
Packet("18 71 d7", True),
Packet("19 00", False),
Packet("00 00 43 80 0b", True),
Packet("e3 00 00", False),
Packet("00 00 42 98 0c 11 33 30 45 39 33 39 33 33 36 39 42 35 36 38 43 32 00", False)
]
#client = PlayerClient(None, "127.0.0.1", 7777)
#client.state = 1
def pbs(bitstream: Bitstream, prefix: str = ""):
print(prefix, bitstream.pretty())
def deserialize(bitstream: Bitstream): # TODO: Something weird with range thing DS_RangeList.h@L92
success, count = bitstream.read_compressed(0x10, True)
if not success:
return False
count, = struct.unpack(b"<H", count)
min, max = 0, 0
for i in range(count):
_, max_equal_to_min = bitstream.read_bit()
success, min = bitstream.read_bits(0x10)
if not success:
return False
min, = struct.unpack(b"<H", min)
if not max_equal_to_min:
success, max = bitstream.read_bits(0x10)
if not success:
return False
max, = struct.unpack(b"<H", max)
if max < min:
return False
else:
max = min
return True
def on_player_packet(packet: bytes):
bitstream = Bitstream(packet)
pbs(bitstream, "Created")
_, has_acks = bitstream.read_bit()
pbs(bitstream, "has_acks")
if has_acks:
print("has_acks: True")
if not deserialize(bitstream):
return False
return
handled = handle_internal_packet(bitstream)
if not handled:
print("Internal packet were not handled")
def handle_internal_packet(bitstream: Bitstream) -> bool:
if bitstream.length - bitstream.offset < 0x10:
return False
_, message_number = bitstream.read_bits(0x10)
pbs(bitstream, "message_number")
_, reliability = bitstream.read_bits(0x04)
pbs(bitstream, "reliability")
if reliability in (7, 9, 10):
success, ordering_channel = bitstream.read_bits(0x05)
pbs(bitstream, "ordering_channel")
if not success:
return False
success, ordering_index = bitstream.read_bits(0x10)
pbs(bitstream, "ordering_index")
if not success:
return False
_, is_split_packet = bitstream.read_bit()
pbs(bitstream, "is_split_packet")
if is_split_packet:
print("Skipping split packet")
return
# TODO: set global split_packet index and count back to 0
# Something I dont understand yet
# TODO: ReadCompressed
#bitstream.offset += 1
success, data_bit_length = bitstream.read_compressed(0x10, True)
pbs(bitstream, "data_bit_length")
if not success:
return False
#_, unknown = bitstream.read(0x01)
#_, bit_length = bitstream.read(0x08)
##
#logger.debug("bit_length: %d" % bit_length)
#logger.debug("bitstream: %s" % bitstream)
data_bit_length, = struct.unpack(b"<H", data_bit_length)
print("data_bit_length:", data_bit_length)
# TODO: ReadAlignedBytes
#_, data = bitstream.read(bit_length)
_, data = bitstream.read_aligned_bytes((data_bit_length + 7) >> 3)
pbs(bitstream, "data")
print("data: %s" % data)
on_player_packet(PACKETS[-2].bitstream._buffer)