Files
network-test-py/client/UDPClient.py

51 lines
1.5 KiB
Python

from SocketClient import SocketClient
import time
import socket
import select
class UDPClient(SocketClient):
_timestamps: set[float] = set()
_last_check = time.time()
def Connect(self) -> bool:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setblocking(False)
return True
def Send(self, data: str):
now = time.time()
data = f"{data}/{now}"
self.socket.sendto(data.encode(), (self.ip, self.port))
self.logger.info("UDP_SENT %s" % data)
self._timestamps.add(now)
self.Receive()
self.staleCheck()
def Reconnect(self) -> bool:
return True
def Receive(self):
(incoming, _, _) = select.select([self.socket], [], [], 0)
if incoming:
data, addr = self.socket.recvfrom(1024)
timestamp = data.split(b"/")[1]
print(timestamp.decode())
self.logger.info("UDP_SUCCESS %s (%s)" % (data.decode(), addr))
self._timestamps.discard(float(timestamp.decode()))
def staleCheck(self):
if (time.time() - self._last_check) < 1:
return
timedout_timestamps = set()
for timestamp in self._timestamps:
if (self._last_check - timestamp) < 10:
continue
self.logger.error("UDP_FAIL %f" % timestamp)
timedout_timestamps.add(timestamp)
self._last_check = time.time()
for timestamp in timedout_timestamps:
self._timestamps.discard(timestamp)