sampy3/sampy/network/query.py
2023-03-15 07:12:47 +01:00

146 lines
4.4 KiB
Python

from __future__ import annotations
import functools
import logging
import struct
from typing import TYPE_CHECKING, Callable, Dict, Tuple
if TYPE_CHECKING:
from sampy.server import Server
HANDLERS: Dict[int, Callable[[Server, bytes, Tuple[str, int]], bool]] = {}
def handler(opcode: bytes):
if len(opcode) > 1:
raise Exception("Query opcode length cannot be bigger then 1")
def outer(func):
@functools.wraps(func)
def inner(
server: Server, packet: bytes, addr: Tuple[str, int], *args, **kwargs
):
return func(server, packet, addr, *args, **kwargs)
HANDLERS[opcode[0]] = func
logging.debug("Added query handler:", opcode, func)
return inner
return outer
class Query:
HEADER_LENGTH = 11
@staticmethod
async def on_packet(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
if len(packet) < 11: # Packet is too small
return False
magic, _ip, _port, opcode = struct.unpack(
b"<4sIHB", packet[: Query.HEADER_LENGTH]
) # Unpack packet
if magic != b"SAMP": # Validate magic
return False
return HANDLERS.get(opcode, lambda *_: False)(server, packet, addr)
@handler(b"i")
@staticmethod
def info(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
len_hostname = len(server.config.hostname)
len_mode = len(server.config.mode)
len_language = len(server.config.language)
packet += struct.pack(
b"<?HHI%dsI%dsI%ds" % (len_hostname, len_mode, len_language),
len(server.config.password) != 0,
len(server.players),
server.config.max_players,
len_hostname,
server.config.hostname.encode(),
len_mode,
server.config.mode.encode(),
len_language,
server.config.language.encode(),
)
server.sendto(packet, addr)
return True
@handler(b"r")
@staticmethod
def rules(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
rules = server.config.rules
rules["version"] = server.protocol.VERSION # Add game version (read-only)
rules["worldtime"] = ""
rules["weather"] = "10"
packet += struct.pack(b"<H", len(rules))
for item in (item.encode() for pair in rules.items() for item in pair):
packet += struct.pack(b"<B%ds" % len(item), len(item), item)
server.sendto(packet, addr)
return True
@handler(b"c")
@staticmethod
def client_list(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
players = server.players
packet += struct.pack(b"<H", len(players))
for player in players:
username = player.username.encode()
packet += struct.pack(b"<B%dsI", len(username), username, player.score)
server.sendto(packet, addr)
return True
@handler(b"d")
@staticmethod
def player_details(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
players = server.players
packet += struct.pack(b"<H", len(players))
for player in players:
username = player.username.encode()
packet += struct.pack(
b"<BB%dsII",
player.id,
len(username),
username,
player.score,
player.ping,
)
server.sendto(packet, addr)
return True
@handler(b"p")
@staticmethod
def ping(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
if (
len(packet) < Query.HEADER_LENGTH + 4
): # Packet is too small (Missing random)
return False
packet = packet[
: Query.HEADER_LENGTH + 4
] # Discard additional data if passed (+4 to include random)
server.sendto(packet, addr)
return True
@handler(b"x")
@staticmethod
def rcon(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
return False # TODO