使用 Python asyncio 同时从三个端口接收数据包的 UDP 服务器

问题描述 投票:0回答:0

我正在尝试制作一个从三个端口接收数据包的 UDP 服务器。 在其中一个端口上,服务器将收到的数据包回显给客户端。在另一 两个端口,它发送一个测量数据包,并根据在两个监控端口上从客户端发送和接收数据包所花费的时间计算 RTT。 我在 mininet 仿真环境中同时运行 UDP 服务器和客户端。但是,生成的 RTT 与路径上配置的链路延迟不符。我不确定问题出在我使用 asyncio 的方式还是客户端。

下面是我的客户代码:

import socket
import time
import select
import argparse
import csv
import math
import netifaces
import numpy as np


def get_address(hostname):
    intfName = hostname+'-eth0'
    addrs = netifaces.ifaddresses(intfName)
    if netifaces.AF_INET6 in addrs:
        ipv6_addrs = addrs[netifaces.AF_INET6]
        ipv6_addr = [addr['addr'] for addr in ipv6_addrs if not addr['addr'].startswith('fe80')]
        return ipv6_addr[0]
    return None


class UDPSocketPool:
    def __init__(self, ip_addr, port, num_sockets):
        self.ipaddr = ip_addr
        self.port = port
        self.num_sockets = num_sockets
        self.sockets = []

        # Create a pool of sockets
        for i in range(num_sockets):
            sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind((ip_addr, port))
            self.sockets.append(sock)

    def sendto(self, data, address):
        # Select a socket from the pool to send the data
        sock = self.sockets[hash(address) % self.num_sockets]
        sock.sendto(data, address)

    def recvfrom(self):
        rlist, _, _ = select.select(self.sockets, [], [])
        response = []
        for sock in rlist:
            try:
                data, server = sock.recvfrom(4096)
            except BlockingIOError as e:
                if e.errno == socket.errno.EWOULDBLOCK or e.errno == socket.errno.EAGAIN:
                    pass
                else:
                    print(f"Socket error: {e}")
            else:
                r_time = time.monotonic()
                receive_time = time.time()
                serv_addr, serv_port, i, o = server
                response.append((r_time, receive_time, data,
                                 serv_addr, serv_port))
        return response

    def close(self):
        # Close all sockets in the pool
        for sock in self.sockets:
            sock.close()


if __name__ == "__main__":

    # Set up command line arguments
    arg_parser = argparse.ArgumentParser(
            description='client emulator',
            usage='%(prog)s [ -d duration  -c csv-output-file]')
    arg_parser.add_argument('-d', dest='duration',
                            help='Connection duration of the client',
                            type=int,
                            default=300)
    arg_parser.add_argument('-n', dest='hostname',
                            help='Hostname of the client',
                            type=str,
                            default=None)
    arg_parser.add_argument('-c', dest='output',
                            help='File to write CSV output of RTT',
                            type=argparse.FileType('w'),
                            default=None)
    args = arg_parser.parse_args()

    if args.output is None:
        raise SystemExit("No filename supplied for the output")

    if args.hostname is None:
        raise SystemExit("Host name not provided for client")

    # Set up client socket
    server_address = ('55::1', 12345)
    server_address_p1 = ('55::4', 12346)
    server_address_p2 = ('55::5', 12347)

    client_addr = get_address(args.hostname)
    print(f"IP address for host {args.hostname} is {client_addr}")

    pool = UDPSocketPool(client_addr, 11111, 3)
    message_counter = 0
    # Set up variables for tracking consecutive counts and times
    last_thirty_rtts = [] 
    init_msm = False

    # Set up CSV writer and write headers
    with args.output as f:
        writer = csv.writer(f, delimiter='|')
        writer.writerow(["Message", "Receive Time", "RTT", "Received"])

        # Loop through messages
        start_time = time.monotonic()
        end_time = start_time + args.duration
        while time.monotonic() < end_time:
            s_time = time.monotonic()
            send_time = time.time()
            message = f"{client_addr},{send_time},{message_counter}"
            pool.sendto(message.encode(), server_address)
            if init_msm is False:
                pool.sendto(message.encode(), server_address_p1)
                pool.sendto(message.encode(), server_address_p2)
                init_msm = True

            for resp in pool.recvfrom():
                (r_time, receive_time, data, serv_addr, serv_port) = resp
                if (serv_addr, serv_port) == server_address:
                    rtt = (r_time - s_time) * 1000
                    received = True
                    last_thirty_rtts.append(rtt)
                    # Write row to CSV
                    writer.writerow([message, receive_time, rtt, received])
                    message_counter += 1

                if (serv_addr, serv_port) == server_address_p1:
                    #msm_msg = f"Pong,{client_addr},{serv_addr},{time.time()}"
                    pool.sendto(data, server_address_p1)

                if (serv_addr, serv_port) == server_address_p2:
                    #msm_msg = f"Pong,{client_addr},{serv_addr},{time.time()}"
                    pool.sendto(data, server_address_p2)

            if len(last_thirty_rtts) == 30:
                p70val = np.percentile(last_thirty_rtts, 70)
                if (p70val > 60.0):
                    print(f"High RTT experienced for 30% of traffic in the last 30 seconds")
                    break
                last_thirty_rtts.clear()

            t_delta = time.monotonic() - s_time
            if t_delta < 1.0:
                duration = 1.0 - t_delta
                time.sleep(duration)

这是下面的服务器代码:

import asyncio
import sys
import time
from queue import Queue
from threading import Thread
from typing import List, Tuple, Dict
import os
from datetime import datetime
import concurrent.futures


class ServerProtocol(asyncio.DatagramProtocol):
    def __init__(self, clients):
        self.clients = clients

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        receive_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        client_addr = addr[0]
        response = f"Received {message} at {receive_time}".encode()
        self.transport.sendto(response, addr)
        if client_addr not in self.clients:
            self.clients[client_addr] = [(None, None), (None, None)]


class P1Protocol(asyncio.DatagramProtocol):
    def __init__(self, clients, srcAddr):
        self.alpha = 0.125
        self.clients = clients
        self.srcAddr = srcAddr
        self.first = 0
        self.message_counter = 0

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        msg_list = message.split(",")
        client_addr = addr[0]
        s_time = time.time()
        if len(msg_list) == 3:
            self.first = 1
        else:
            r_time = time.time()
            pkt_s_time = float(msg_list[4])
            rtt = (r_time - pkt_s_time) * 1000
            ertt, rtt_prev = self.clients[client_addr][0]
            if ertt is None:
                ertt = rtt
                self.clients[client_addr][0] = (ertt, rtt)
            else:
                ertt = ((1 - self.alpha) * ertt) + (self.alpha * rtt)
                self.clients[client_addr][0] = (ertt, rtt)
        ping = f"Ping,{client_addr},{self.srcAddr},{self.message_counter},{s_time}"
        self.transport.sendto(ping.encode(), addr)
        self.message_counter += 1

    def send_packet(self, addr):
        ping = f"Ping,{addr},{self.srcAddr},{time.monotonic()}"
        self.transport.sendto(ping.encode(), (addr, 11111))


class P2Protocol(asyncio.DatagramProtocol):
    def __init__(self, clients, srcAddr):
        self.alpha = 0.125
        self.clients = clients
        self.srcAddr = srcAddr
        self.first = 0
        self.message_counter = 0

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        msg_list = message.split(",")
        client_addr = addr[0]
        s_time = time.time()
        if len(msg_list) == 3:
            self.first = 1
        else:
            r_time = time.time()
            pkt_s_time = float(msg_list[4])
            rtt = (r_time - pkt_s_time) * 1000
            ertt, rtt_prev = self.clients[client_addr][1]
            if ertt is None:
                ertt = rtt
                self.clients[client_addr][1] = (ertt, rtt)
            else:
                ertt = ((1 - self.alpha) * ertt) + (self.alpha * rtt)
                self.clients[client_addr][1] = (ertt, rtt)
        ping = f"Ping,{client_addr},{self.srcAddr},{self.message_counter},{s_time}"
        self.transport.sendto(ping.encode(), addr)
        self.message_counter += 1

    def send_packet(self, addr):
        ping = f"Ping,{addr},{self.srcAddr},{time.monotonic()}"
        self.transport.sendto(ping.encode(), (addr, 11111))



async def Server(clients: Dict[str, list],
                 loop: asyncio.AbstractEventLoop) -> None:
    transport, protocol = await loop.create_datagram_endpoint(
            lambda: ServerProtocol(clients),
            local_addr=('55::1', 12345))
    try:
        while True:
            await asyncio.sleep(0.001)
    except asyncio.CancelledError:
        transport.close()


def ServerWorker(clients: Dict[str, list],
                 loop: asyncio.AbstractEventLoop) -> None:
    loop.run_until_complete(Server(clients,loop))


async def ServerMsmWorker1(clients: Dict[str, list],
                           srcAddr: str,
                           port: int,
                           loop: asyncio.AbstractEventLoop) -> None:

    P1transport, protocol = await loop.create_datagram_endpoint(
            lambda: P1Protocol(clients, srcAddr),
            local_addr=(srcAddr, port))

    try:
        while True:
            await asyncio.sleep(0.001)
    except asyncio.CancelledError:
        P1transport.close()


async def ServerMsmWorker2(clients: Dict[str, list],
                           srcAddr: str,
                           port: int,
                           loop: asyncio.AbstractEventLoop) -> None:

    P2transport, protocol = await loop.create_datagram_endpoint(
            lambda: P2Protocol(clients, srcAddr),
            local_addr=(srcAddr, port))

    try:
        while True:
            await asyncio.sleep(0.001)
    except asyncio.CancelledError:
        P2transport.close()


def msmWorker1(clients: Dict[str, list],
               srcAddr: str,
               port: int) -> None:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(ServerMsmWorker1(clients, srcAddr, port, loop))


def msmWorker2(clients: Dict[str, list],
               srcAddr: str,
               port: int) -> None:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(ServerMsmWorker2(clients, srcAddr, port, loop))



async def main():
    clients = {}
    loop = asyncio.get_running_loop()
    t1 = loop.create_task(Server(clients, loop))
    t2 = loop.create_task(ServerMsmWorker1(clients, "55::4", 12346, loop))
    t3 = loop.create_task(ServerMsmWorker2(clients, "55::5", 12347, loop))

    await asyncio.gather(t1, t2, t3)
    


def tmain() -> None:

    clients = {}
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    gameServer = Thread(target=ServerWorker,
                        args=(clients, loop),
                        daemon=True,
                        name=f"gameServer",)
    gameServer.start()

    P1Thread = Thread(target=msmWorker1,
                      args=(clients, "55::4", 12346),
                      daemon=True,
                      name=f"P1Thread",)

    P2Thread = Thread(target=msmWorker2,
                      args=(clients, "55::5", 12347),
                      daemon=True,
                      name=f"P2Thread",)
    P2Thread.start()
    P1Thread.start()
    while True:
        time.sleep(1.0)


if __name__ == "__main__":
    #tmain()
    asyncio.run(main())

一个简单版本的服务器使用 asyncio 只有一个套接字如下所示 预期

import asyncio
import time


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        receive_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        port = addr[1]
        response = f"Received {message} at {receive_time}".encode()
        self.transport.sendto(response, addr)


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    print("Starting the server")
    listen = loop.create_datagram_endpoint(
            EchoServerProtocol, local_addr=('55::1', 12345))
    transport, protocol = loop.run_until_complete(listen)

    try:
        loop.run_forever()
    except (KeyboardInterrupt, SystemExit):
        print("server is shutting down.")
    finally:
        transport.close()
        loop.close()

如果我能得到任何关于为什么 asyncio 和套接字没有按预期运行的指示,我将不胜感激。

python sockets udp
© www.soinside.com 2019 - 2024. All rights reserved.