from __future__ import annotations import asyncio import logging from typing import TYPE_CHECKING, Optional, Tuple, Type from sampy.client.player import Player from sampy.config import Config if TYPE_CHECKING: from sampy.network.protocol import Protocol class UDPProtocol: transport: asyncio.transports.DatagramTransport def __init__(self, protocol: Protocol, local_addr: Tuple[str, int]): self.protocol = protocol self.local_addr = local_addr def start(self): loop = asyncio.get_event_loop() logging.debug("Creating datagram endpoint") connect = loop.create_datagram_endpoint( lambda: self, local_addr=self.local_addr, ) loop.run_until_complete(connect) def stop(self): # TODO: Shutdown code if self.transport is None: raise Exception("Cannot stop a server that hasn't been started") logging.debug("Shutting down") self.transport.close() def connection_made(self, transport: asyncio.transports.DatagramTransport): logging.debug("UDP Protocol: connection_made") self.transport = transport def connection_lost(self, exc: Exception | None): logging.debug("UDP Protocol: connection_lost") def datagram_received(self, data: bytes, addr: Tuple[str, int]): raise NotImplementedError def sendto(self, data: bytes | bytearray | memoryview, addr: Tuple[str, int]): self.transport.sendto(data, addr) class Server(UDPProtocol): config: Config def __init__(self, protocol: Type[Protocol], config: Optional[Config] = None): if config is None: config = Config() logging.warn("Server was initialized with default config") super().__init__( protocol=protocol(), local_addr=( config.get("sampy", "host"), config.getint("sampy", "port"), ), ) self.config = config def datagram_received(self, data: bytes, addr: Tuple[str, int]): loop = asyncio.get_event_loop() loop.create_task(self.protocol.on_packet(self, data, addr)) @property def players(self) -> list[Player]: # TODO return [] class InteractiveServer(Server): def __init__(self, protocol: Type[Protocol], config: Optional[Config] = None): super().__init__(protocol=protocol, config=config) loop = asyncio.get_event_loop() loop.create_task(self.run_input_loop()) async def run_input_loop(self): loop = asyncio.get_event_loop() while True: command = await loop.run_in_executor(None, input) if command in ("quit", "exit", "stop"): self.stop() loop.stop() return