diff --git a/sampy/server.py b/sampy/server/__init__.py similarity index 93% rename from sampy/server.py rename to sampy/server/__init__.py index 1132eda..474da09 100644 --- a/sampy/server.py +++ b/sampy/server/__init__.py @@ -2,8 +2,10 @@ import socket import asyncio from select import select # This is straight up magic -from .struct.server import ServerConfig -from .client import Client +from ..struct.server import ServerConfig +from ..client import Client + +from .compression import Compressor import logging logger = logging.getLogger(__name__) @@ -13,6 +15,8 @@ class Server: self.config = config self.clients = {} self.rcon_clients = {} + + self.compressor = Compressor(self.config) async def create_socket(self): self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) diff --git a/sampy/server/compression.py b/sampy/server/compression.py new file mode 100644 index 0000000..db3c507 --- /dev/null +++ b/sampy/server/compression.py @@ -0,0 +1,48 @@ +from ..shared import glob +from ..struct.server import ServerConfig + +import logging +logger = logging.getLogger(__name__) + +# Found @ addr 0x004C88E0 +LOOKUP_TABLE = b"\xb4b\x07\xe5\x9d\xafc\xdd\xe3\xd0\xcc\xfe\xdc\xdbk.j@\xabG\xc9\xd1S\xd5 \x91\xa5\x0eJ\xdf\x18\x89\xfdo%\x12\xb7\x13w\x00e6mI\xecW*\xa9\x11_\xfax\x95\xa4\xbd\x1e\xd9yD\xcd\xde\x81\xeb\t>\xf6\xee\xda\x7f\xa3\x1a\xa7-\xa6\xad\xc1F\x93\xd2\x1b\x9c\xaa\xd7NKML\xf3\xb84\xc0\xca\x88\xf4\x94\xcb\x0490\x82\xd6s\xb0\xbf\"\x01AnH,\xa8u\xb1\n\xae\x9f\'\x80\x10\xce\xf0)(\x85\r\x05\xf75\xbb\xbc\x15\x06\xf5`q\x03\x1f\xeaZ3\x92\x8d\xe7\x90[\xe9\xcf\x9e\xd3]\xed1\x1c\x0bR\x16Q\x0f\x86\xc5h\x9b!\x0c\x8bB\x87\xffO\xbe\xc8\xe8\xc7\xd4z\xe0U/\x8a\x8e\xba\x987\xe4\xb28\xa1\xb62\x83:{\x84 bytes: + """Decompress client packet. + This is actually a deobfuscation as there is no compression involved anymore + as zlib was removed and just swapped with this implementation after the leak. + + Arguments: + bytestream {bytes} -- Bytes sent by client + """ + checksum, data = bytestream[0], bytestream[1:] + + data = self.xor_every_other_byte(self.get_port_xor_key(), data) + data = self.run_though_lookup_table(data) + + if checksum != self.calc_checksum(data): + logger.error("Checksum failed!") + raise Exception("Checksum fail") + + return data + + def xor_every_other_byte(self, xor: int, bytestream: bytes) -> bytes: + for i in range(1, len(bytestream), 2): + bytestream[i] ^= xor + return bytestream + + def run_though_lookup_table(self, bytestream: bytes) -> bytes: + return bytes(LOOKUP_TABLE[b] for b in bytestream) + + def calc_checksum(self, bytestream: bytes) -> int: + checksum = 0 + for byte in bytestream: + checksum ^= byte & 0xAA + return checksum + + def get_port_xor_key(self) -> int: + return (self.config.port ^ 0xCCCC) & 0xFF