sampy3/sampy/network/query.py

186 lines
6.7 KiB
Python
Raw Normal View History

2023-03-15 07:12:47 +01:00
from __future__ import annotations
import functools
import logging
import struct
from typing import TYPE_CHECKING, Callable, Dict, Tuple
if TYPE_CHECKING:
from sampy.server import Server
2023-03-19 05:17:44 +01:00
# Holds a dict of handlers for different packet IDs. Since every query packet id is a single byte, we use an int here.
2023-03-15 07:12:47 +01:00
HANDLERS: Dict[int, Callable[[Server, bytes, Tuple[str, int]], bool]] = {}
2023-03-19 05:17:44 +01:00
# Decorator that adds the function to the HANDLERS dict.
def handler(packet_id: bytes):
if len(packet_id) > 1:
2023-03-15 07:12:47 +01:00
raise Exception("Query opcode length cannot be bigger then 1")
def outer(func):
@functools.wraps(func)
def inner(
server: Server, packet: bytes, addr: Tuple[str, int], *args, **kwargs
):
return func(server, packet, addr, *args, **kwargs)
2023-03-19 05:17:44 +01:00
HANDLERS[packet_id[0]] = func
logging.debug("Added Query handler: %s -> %s" % (packet_id, func))
2023-03-15 07:12:47 +01:00
return inner
return outer
class Query:
2023-03-19 05:17:44 +01:00
"""Query handler
Reference: https://team.sa-mp.com/wiki/Query_Mechanism.html (Unable to archive due to https://sa-mp.com/robots.txt disallowing ia_archiver)
The Query protocol is *mostly* used for the samp server browser and other systems that queries for server info.
The exception to this is would be the "player_details"(d) packet on version 0.2.x and below where an ingame player pressing TAB would use this packet.
Structure:
- "SAMP" header (4 bytes)
- Server's ipv4 (4 bytes)
- Server's port (2 bytes)
- Packet type (1 byte)
- Packet data
Note that the server and client will both use the same first 4 parts when communicating.
Not all packets have packet data, and most packets doesn't have data when the client sends it as its a "request".
The server will always have packet data attached.
List of packets:
- i: Information packet. This includes hostname, players (online/max), mode, language and whether password is required.
- r: Rules packet. This is a list of "rules". The name "rules" is subjective in this case, as most are general optional information.
- c: Client list packet. This is a list of players and scores. Players being just their username and score a number.
- d: Detailed player list packet. Extends the client list packet with player id and ping.
- p: Ping packet. A client uses this packet with 4 random bytes and measures how long it takes before it gets the same packet back.
- x: Remote console packet. A client can send and receive anything on this packet. Usually used for remote console.
Additional findings:
- On info packet, strings can not be of size 0. This will make parsing of the rest of the packet fail.
- This is strange due to how we are sending the length of the string first, which should allow this...
- Fix: If string size is 0, we replace the string with a single space (Considering using a NULL byte, but unsure if this could cause other issues)
"""
2023-03-15 07:12:47 +01:00
HEADER_LENGTH = 11
@staticmethod
async def on_packet(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
if len(packet) < 11: # Packet is too small
return False
2023-03-19 05:17:44 +01:00
magic, _ip, _port, packet_id = struct.unpack(
2023-03-15 07:12:47 +01:00
b"<4sIHB", packet[: Query.HEADER_LENGTH]
) # Unpack packet
if magic != b"SAMP": # Validate magic
return False
2023-03-19 05:17:44 +01:00
return HANDLERS.get(packet_id, lambda *_: False)(server, packet, addr)
2023-03-15 07:12:47 +01:00
@handler(b"i")
@staticmethod
def info(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
2023-03-19 05:17:44 +01:00
hostname = server.config.hostname.encode()
mode = server.config.mode.encode()
language = server.config.language.encode()
if len(hostname) == 0:
hostname = b" "
if len(mode) == 0:
mode = b" "
if len(language) == 0:
language = b" "
2023-03-15 07:12:47 +01:00
packet += struct.pack(
2023-03-19 05:17:44 +01:00
b"<?HHI%dsI%dsI%ds" % (len(hostname), len(mode), len(language)),
2023-03-15 07:12:47 +01:00
len(server.config.password) != 0,
len(server.players),
server.config.max_players,
2023-03-19 05:17:44 +01:00
len(hostname),
hostname,
len(mode),
mode,
len(language),
language,
2023-03-15 07:12:47 +01:00
)
server.sendto(packet, addr)
return True
@handler(b"r")
@staticmethod
def rules(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
rules = server.config.rules
rules["version"] = server.protocol.VERSION # Add game version (read-only)
rules["worldtime"] = ""
rules["weather"] = "10"
packet += struct.pack(b"<H", len(rules))
for item in (item.encode() for pair in rules.items() for item in pair):
packet += struct.pack(b"<B%ds" % len(item), len(item), item)
server.sendto(packet, addr)
return True
@handler(b"c")
@staticmethod
def client_list(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
players = server.players
packet += struct.pack(b"<H", len(players))
for player in players:
username = player.username.encode()
packet += struct.pack(b"<B%dsI", len(username), username, player.score)
server.sendto(packet, addr)
return True
@handler(b"d")
@staticmethod
def player_details(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
packet = packet[: Query.HEADER_LENGTH] # Discard additional data if passed
players = server.players
packet += struct.pack(b"<H", len(players))
for player in players:
username = player.username.encode()
packet += struct.pack(
b"<BB%dsII",
player.id,
len(username),
username,
player.score,
player.ping,
)
server.sendto(packet, addr)
return True
@handler(b"p")
@staticmethod
def ping(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
if (
len(packet) < Query.HEADER_LENGTH + 4
): # Packet is too small (Missing random)
return False
packet = packet[
: Query.HEADER_LENGTH + 4
] # Discard additional data if passed (+4 to include random)
server.sendto(packet, addr)
return True
@handler(b"x")
@staticmethod
def rcon(server: Server, packet: bytes, addr: Tuple[str, int]) -> bool:
return False # TODO