BitStreams
This commit is contained in:
parent
1b78fb98bc
commit
483587fd3b
|
@ -13,7 +13,7 @@
|
|||
"host": "0.0.0.0",
|
||||
"port": 7777,
|
||||
"hostname": "Python > C",
|
||||
"password": "test",
|
||||
"password": "",
|
||||
"rcon_password": "changeme",
|
||||
"max_players": 50,
|
||||
"mode": "debug",
|
||||
|
|
|
@ -62,7 +62,7 @@ BitStream::ReadBit() # hasAcks
|
|||
CreateInternalPacketFromBitStream()
|
||||
caller_addr: 0x0045FB0D
|
||||
time: 22 39423557
|
||||
bitStream->numberOfBitsAllocated: 176
|
||||
bitStream->numberOfBitsAllocated: 176 # 22 bytes
|
||||
bitStream->numberOfBitsUsed: 176
|
||||
bitStream->readOffset: 1
|
||||
bitStream->data:
|
||||
|
|
|
@ -96,7 +96,7 @@ class QueryClient(BaseClient):
|
|||
p["ping"]
|
||||
]
|
||||
|
||||
return packet + struct.pack(b"<H" + (b"cc%dsII" * data[0]) % tuple(len(p["nick"]) for p in players), *data)
|
||||
return packet + struct.pack(b"<H" + (b"BB%dsII" * data[0]) % tuple(len(p["nick"]) for p in players), *data)
|
||||
|
||||
async def query_p(self, packet: bytes) -> bytes:
|
||||
return packet
|
||||
|
|
|
@ -7,91 +7,103 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
class Bitstream:
|
||||
def __init__(self, data: bytes = b""):
|
||||
self.offset = 0
|
||||
self.buffer = array("B", data)
|
||||
self._offset = 0
|
||||
self._buffer = bytearray(data) # array("B", data)
|
||||
|
||||
# This is not finished.. would like to implement bit_length support
|
||||
def write(self, value: bool):
|
||||
offset = self.offset % 8
|
||||
def _can_access_bits(self, bits_to_access: int) -> bool:
|
||||
return self._offset + bits_to_access <= len(self._buffer) << 3
|
||||
|
||||
if self.offset // 8 == len(self.buffer):
|
||||
self.buffer.append(0)
|
||||
def write_bit(self, value: bool) -> bool:
|
||||
if not self._can_access_bits(1):
|
||||
return False
|
||||
|
||||
self.offset += 1
|
||||
mask = (1 << (7 - (self._offset % 8)))
|
||||
if value:
|
||||
self._buffer[self._offset >> 3] |= mask
|
||||
else:
|
||||
self._buffer[self._offset >> 3] &= ~mask
|
||||
|
||||
if value == False:
|
||||
return
|
||||
self._offset += 1
|
||||
return True
|
||||
|
||||
mask = 1 << (7 - offset)
|
||||
self.buffer[-1] |= mask
|
||||
def read_bit(self) -> (bool, bool):
|
||||
if not self._can_access_bits(1):
|
||||
return False, False
|
||||
|
||||
"""
|
||||
def write(self, value: int, bit_length: int):
|
||||
offset = self.offset % 8
|
||||
mask = (1 << (7 - (self._offset % 8)))
|
||||
value = self._buffer[self._offset >> 3] & mask
|
||||
|
||||
shifts = offset + bit_length
|
||||
while shifts > 8:
|
||||
self._offset += 1
|
||||
return True, value > 0
|
||||
|
||||
def write(self, value: bytes, bit_length: int) -> bool:
|
||||
if not self._can_access_bits(bit_length):
|
||||
return False
|
||||
|
||||
if offset == 0:
|
||||
self.buffer.append(0)
|
||||
if len(value) << 3 < bit_length:
|
||||
return False
|
||||
|
||||
v = self.buffer[buffer_index]
|
||||
byte_from = self._offset >> 3
|
||||
byte_to = (self._offset + bit_length + 7) >> 3
|
||||
|
||||
mask = (1 << ((self.offset % 8) + 1))
|
||||
"""
|
||||
bits_written = 0
|
||||
|
||||
for byte_index in range(byte_from, byte_to, 1):
|
||||
byte = self._buffer[byte_index]
|
||||
for bit_index in range(self._offset % 8, 8, 1):
|
||||
mask = 1 << (7 - bit_index)
|
||||
bit = value[bits_written >> 3] & (1 << ((7 - bits_written) % 8))
|
||||
if bit:
|
||||
byte |= mask
|
||||
else:
|
||||
byte &= ~mask
|
||||
|
||||
def read(self, bit_length: int) -> int:
|
||||
byte_read_from = self.offset // 8
|
||||
byte_read_to = (self.offset + bit_length - 1) // 8
|
||||
bytes_to_read = byte_read_to - byte_read_from + 1
|
||||
self._offset += 1
|
||||
bits_written += 1
|
||||
|
||||
if byte_read_to >= len(self.buffer):
|
||||
logger.debug("byte_read_to: %d" % byte_read_to)
|
||||
logger.debug("len(self.buffer): %d" % len(self.buffer))
|
||||
raise Exception("Reading beyond the buffer")
|
||||
if bits_written >= bit_length:
|
||||
break
|
||||
|
||||
self._buffer[byte_index] = byte
|
||||
|
||||
return True
|
||||
|
||||
def read(self, bit_length: int) -> (bool, bytes):
|
||||
if not self._can_access_bits(bit_length):
|
||||
return False, b""
|
||||
|
||||
byte_from = self._offset >> 3
|
||||
byte_to = (self._offset + bit_length + 7) >> 3
|
||||
|
||||
# Since python doesnt have a max number size,
|
||||
# we can read the bits we need and store them into a number,
|
||||
# then convert the number to bytes (as they are all in the same for python)
|
||||
value = 0
|
||||
shifts = bytes_to_read - 1
|
||||
while shifts >= 0:
|
||||
value += self.buffer[byte_read_to - shifts] << (shifts * 8)
|
||||
shifts -= 1
|
||||
|
||||
value <<= self.offset % 8
|
||||
value &= (1 << (8 * bytes_to_read)) - 1
|
||||
value >>= (bytes_to_read * 8) - bit_length
|
||||
for byte_index in range(byte_from, byte_to, 1):
|
||||
for bit_index in range(self._offset % 8, 8, 1):
|
||||
mask = 1 << (7 - bit_index)
|
||||
bit = self._buffer[self._offset >> 3] & (1 << ((7 - self._offset) % 8))
|
||||
value = (value << 1) + bit
|
||||
|
||||
self.offset += bit_length
|
||||
return value
|
||||
self._offset += 1
|
||||
|
||||
def read_compressed(self, bit_length: int, unsigned: bool):
|
||||
if self._offset >= bit_length:
|
||||
break
|
||||
|
||||
#if self._offset % 8 == 0:
|
||||
#value +=
|
||||
|
||||
return True, value.to_bytes((bit_length + 7) >> 3, "big") # bytes(value)
|
||||
|
||||
def read_compressed(self):
|
||||
pass
|
||||
|
||||
def read_aligned_bytes(self, byte_length: int) -> bytes:
|
||||
if byte_length <= 0:
|
||||
return b""
|
||||
|
||||
self.align_to_byte()
|
||||
|
||||
if self.offset + (byte_length * 8) > len(self.buffer):
|
||||
logger.debug("Reading beyond the buffer")
|
||||
return b""
|
||||
|
||||
byte_read_from = self.offset // 8
|
||||
byte_read_to = byte_read_from + byte_length
|
||||
self.offset += byte_length
|
||||
|
||||
return self.buffer[byte_read_from:byte_read_to + 1]
|
||||
|
||||
def align_to_byte(self):
|
||||
if self.offset % 8 != 0:
|
||||
self.offset += 8 - (self.offset % 8)
|
||||
def pretty(self) -> str:
|
||||
b = bytearray(" " + " ".join(" ".join(format(c, "08b")) for c in self._buffer) + " ", "ascii")
|
||||
m = self._offset * 2 + (self._offset >> 3) - (self._offset // (max(1, len(self._buffer)) * 8))
|
||||
b[m] = 124
|
||||
return b.decode()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
b = bytearray(" " + " ".join(" ".join(format(c, "08b")) for c in self.buffer) + " ", "ascii")
|
||||
m = self.offset * 2 + (self.offset // 8) - (self.offset // (max(1, len(self.buffer)) * 8))
|
||||
#b[m] = 91 # [
|
||||
#b[m + 2] = 93 # ]
|
||||
b[m] = 124 # |
|
||||
return b.decode()
|
||||
return "<Bitstream addr:0x%012x offset:%d len:%d>" % (id(self), self._offset, len(self._buffer) << 3)
|
||||
|
|
|
@ -22,7 +22,7 @@ class Server:
|
|||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.socket.bind((self.config.host, self.config.port))
|
||||
self.socket.setblocking(0) # TODO: Check if needed? I dont fully understand this "feature"
|
||||
self.socket.setblocking(0)
|
||||
logger.debug("Socket created")
|
||||
|
||||
async def on_command(self, cmd: str):
|
||||
|
|
103
test.py
103
test.py
|
@ -1,30 +1,85 @@
|
|||
from sampy.raknet.bitstream import Bitstream
|
||||
|
||||
a = Bitstream()
|
||||
"""
|
||||
a.buffer.append(0x42)
|
||||
a.buffer.append(0x13)
|
||||
a.buffer.append(0xFF)
|
||||
a.buffer.append(0xAC)
|
||||
a.buffer.append(0x00)
|
||||
a.buffer.append(0x69)
|
||||
"""
|
||||
"""
|
||||
a.write(1)
|
||||
a.write(0)
|
||||
a.write(1)
|
||||
a.write(1)
|
||||
a.write(1)
|
||||
a.write(1)
|
||||
a.write(0)
|
||||
a.write(1)
|
||||
bitstream = Bitstream(b"\x00" * 4)
|
||||
MAGIC_BYTES = b"\x01\xff\x02"
|
||||
MAGIC_BITS = [int(bit) for bit in " ".join([format(byte, "08b") for byte in MAGIC_BYTES]) if bit in ("0", "1")]
|
||||
fail = 0
|
||||
|
||||
a.write(1)
|
||||
"""
|
||||
def print_pretty(prefix, msg):
|
||||
prefix += " " * (40 - len(prefix))
|
||||
print(prefix, msg)
|
||||
|
||||
data = [int(x, 16) for x in "00 00 42 90 0b 61 61 61 61 62 62 62 62 63 63 63 63 66 66 66 66 00".split(" ")]
|
||||
def reset_bitstream():
|
||||
bitstream._buffer = bytearray(b"\x00" * len(bitstream._buffer))
|
||||
bitstream._offset = 0
|
||||
|
||||
for b in data:
|
||||
a.buffer.append(b)
|
||||
def test_write_bit(var_bits):
|
||||
global fail
|
||||
print_pretty("<Test bitstream.write_bit:in>", bitstream.pretty())
|
||||
for bit in var_bits:
|
||||
if not bitstream.write_bit(bit):
|
||||
print_pretty("<Test bitstream.write_bit>", "Failed")
|
||||
fail += 1
|
||||
print_pretty("<Test bitstream.write_bit:out>", bitstream.pretty())
|
||||
|
||||
print(a)
|
||||
def test_read_bit(var_compare_bits):
|
||||
global fail
|
||||
print_pretty("<Test bitstream.read_bit:in>", bitstream.pretty())
|
||||
for bit in var_compare_bits:
|
||||
ok, bit2 = bitstream.read_bit()
|
||||
if not ok:
|
||||
print_pretty("<Test bitstream.read_bit>", "Failed (ok)")
|
||||
fail += 1
|
||||
continue
|
||||
if bit2 != bit:
|
||||
print_pretty("<Test bitstream.read_bit>", "Failed (match)")
|
||||
fail += 1
|
||||
continue
|
||||
print_pretty("<Test bitstream.read_bit:out>", bitstream.pretty())
|
||||
|
||||
def test_write(var_bytes):
|
||||
global fail
|
||||
print_pretty("<Test bitstream.write:in>", bitstream.pretty())
|
||||
if not bitstream.write(var_bytes, len(var_bytes) << 3):
|
||||
print_pretty("<Test bitstream.write>", "Failed")
|
||||
fail += 1
|
||||
print_pretty("<Test bitstream.write:out>", bitstream.pretty())
|
||||
|
||||
def test_read(var_compare_bytes):
|
||||
global fail
|
||||
print_pretty("<Test bitstream.read:in>", bitstream.pretty())
|
||||
ok, bytez = bitstream.read(len(var_compare_bytes) << 3)
|
||||
if ok:
|
||||
print(var_compare_bytes, bytez)
|
||||
if bytez != var_compare_bytes:
|
||||
print_pretty("<Test bitstream.read>", "Failed (match)")
|
||||
fail += 1
|
||||
else:
|
||||
print_pretty("<Test bitstream.read>", "Failed (ok)")
|
||||
fail += 1
|
||||
print_pretty("<Test bitstream.read:out>", bitstream.pretty())
|
||||
|
||||
def test_rw_bit(offset):
|
||||
bitstream._offset = offset
|
||||
test_write_bit(MAGIC_BITS)
|
||||
bitstream._offset = offset
|
||||
test_read_bit(MAGIC_BITS)
|
||||
|
||||
def test_rw_bytes(offset):
|
||||
bitstream._offset = offset
|
||||
test_write(MAGIC_BYTES)
|
||||
bitstream._offset = offset
|
||||
test_read(MAGIC_BYTES)
|
||||
|
||||
# Test all edges
|
||||
for i in range(9):
|
||||
reset_bitstream()
|
||||
test_rw_bit(i)
|
||||
|
||||
reset_bitstream()
|
||||
|
||||
for i in range(9):
|
||||
reset_bitstream()
|
||||
test_rw_bytes(i)
|
||||
|
||||
print("Failed: %d" % fail)
|
Loading…
Reference in New Issue
Block a user