diff --git a/sampy/network/game.py b/sampy/network/game.py index 38c12b8..7b4cddd 100644 --- a/sampy/network/game.py +++ b/sampy/network/game.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import TYPE_CHECKING, Tuple if TYPE_CHECKING: @@ -9,4 +10,66 @@ if TYPE_CHECKING: class Game: @staticmethod async def on_packet(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool: + packet = Obfuscation.deobfuscate(server, packet) + + # TODO return False + + +class Obfuscation: + # Found @ addr 0x004C88E0 in windows server executable + LOOKUP_TABLE = bytes.fromhex( + """ + b46207e59daf63dde3d0ccfedcdb6b2e6a40ab47c9d153d52091a50e4adf1889 + fd6f2512b713770065366d49ec572aa9115ffa7895a4bd1ed97944cdde81eb09 + 3ef6eeda7fa31aa72da6adc14693d21b9caad74e4b4d4cf3b834c0ca88f494cb + 04393082d673b0bf2201416e482ca875b10aae9f278010cef02928850d05f735 + bbbc1506f56071031fea5a33928de7905be9cf9ed35ded311c0b5216510f86c5 + 689b210c8b4287ff4fbec8e8c7d47ae0552f8a8eba9837e4b238a1b632833a7b + 843c61fb8c143d433b1dc3a296b3f8c4f2262bd87cfc232466ef6964505459f1 + a074acc67db5e6e2c27e67175ee1b93f6c700899455676f99a9719725c028f58 + """ + ) + + # I think this used to be zlib compression, but has been swapped out with obfuscation instead + @staticmethod + def deobfuscate(server: Server, packet: bytes) -> bytes: + checksum, data = packet[0], bytearray(packet[1:]) + + data = Obfuscation.xor_every_other_byte( + Obfuscation.get_port_xor_key(server), data + ) + data = bytes(Obfuscation.LOOKUP_TABLE[b] for b in packet) + + if checksum != Obfuscation.calc_checksum(data): + logging.error("Checksum failed!") + raise Exception("Checksum fail") + + return bytes(data) + + @staticmethod + def obfuscate(server: Server, packet: bytes) -> bytes: + data = bytes(Obfuscation.LOOKUP_TABLE.index(b) for b in packet) + data = Obfuscation.xor_every_other_byte( + Obfuscation.get_port_xor_key(server), data + ) + + checksum = Obfuscation.calc_checksum(data) + return bytes([checksum]) + data + + @staticmethod + def xor_every_other_byte(xor: int, packet: bytearray) -> bytearray: + for i in range(1, len(packet), 2): + packet[i] ^= xor + return packet + + @staticmethod + def calc_checksum(packet: bytearray) -> int: + checksum = 0 + for byte in packet: + checksum ^= byte & 0xAA + return checksum + + @staticmethod + def get_port_xor_key(server: Server) -> int: + return (server.config.port ^ 0xCCCC) & 0xFF