Files
sampy/sampy/raknet/bitstream.py
2021-04-25 17:39:47 +02:00

121 lines
3.5 KiB
Python

import ctypes as c
from array import array
from typing import Tuple
import logging
logger = logging.getLogger(__name__)
class Bitstream:
def __init__(self, data: bytes = b""):
self._offset = 0
self._buffer = bytearray(data) # array("B", data)
@property
def offset(self) -> int:
return self._offset
@property
def length(self) -> int:
return len(self._buffer)
def _can_access_bits(self, bits_to_access: int) -> bool:
return self._offset + bits_to_access <= len(self._buffer) << 3
def write_bit(self, value: bool) -> bool:
if not self._can_access_bits(1):
return False
mask = (1 << (7 - (self._offset % 8)))
if value:
self._buffer[self._offset >> 3] |= mask
else:
self._buffer[self._offset >> 3] &= ~mask
self._offset += 1
return True
def read_bit(self) -> Tuple[bool, bool]:
if not self._can_access_bits(1):
return False, False
mask = (1 << (7 - (self._offset % 8)))
value = self._buffer[self._offset >> 3] & mask
self._offset += 1
return True, value > 0
def write(self, value: bytes, bit_length: int) -> bool:
if not self._can_access_bits(bit_length):
return False
if len(value) << 3 < bit_length:
return False
byte_from = self._offset >> 3
byte_to = (self._offset + bit_length + 7) >> 3
bits_written = 0
for byte_index in range(byte_from, byte_to, 1):
byte = self._buffer[byte_index]
for bit_index in range(self._offset % 8, 8, 1):
mask = 1 << (7 - bit_index)
bit = value[bits_written >> 3] & (1 << ((7 - bits_written) % 8))
if bit:
byte |= mask
else:
byte &= ~mask
self._offset += 1
bits_written += 1
if bits_written >= bit_length:
break
self._buffer[byte_index] = byte
return True
def read(self, bit_length: int) -> Tuple[bool, bytes]:
if not self._can_access_bits(bit_length):
return False, b""
byte_from = self._offset >> 3
byte_to = (self._offset + bit_length + 7) >> 3
# Since python doesnt have a max number size,
# we can read the bits we need and store them into a number,
# then convert the number to bytes (as they are all in the same for python)
value = 0
for byte_index in range(byte_from, byte_to, 1):
for bit_index in range(self._offset % 8, 8, 1):
mask = 1 << (7 - bit_index)
bit = self._buffer[self._offset >> 3] & (1 << ((7 - self._offset) % 8))
value = (value << 1) + bit
self._offset += 1
if self._offset >= bit_length:
break
#if self._offset % 8 == 0:
#value +=
return True, value.to_bytes((bit_length + 7) >> 3, "big") # bytes(value)
def read_compressed(self):
pass
def pretty(self) -> str:
b = bytearray(" " + " ".join(" ".join(format(c, "08b")) for c in self._buffer) + " ", "ascii")
m = self._offset * 2 + (self._offset >> 3) - (self._offset // (max(1, len(self._buffer)) * 8))
b[m] = 124
return b.decode()
def __repr__(self) -> str:
return "<Bitstream addr:0x%012x offset:%d len:%d>" % (id(self), self._offset, len(self._buffer) << 3)