我正在尝试制作一个从三个端口接收数据包的 UDP 服务器。 在其中一个端口上,服务器将收到的数据包回显给客户端。在另一 两个端口,它发送一个测量数据包,并根据在两个监控端口上从客户端发送和接收数据包所花费的时间计算 RTT。 我在 mininet 仿真环境中同时运行 UDP 服务器和客户端。但是,生成的 RTT 与路径上配置的链路延迟不符。我不确定问题出在我使用 asyncio 的方式还是客户端。
下面是我的客户代码:
import socket
import time
import select
import argparse
import csv
import math
import netifaces
import numpy as np
def get_address(hostname):
intfName = hostname+'-eth0'
addrs = netifaces.ifaddresses(intfName)
if netifaces.AF_INET6 in addrs:
ipv6_addrs = addrs[netifaces.AF_INET6]
ipv6_addr = [addr['addr'] for addr in ipv6_addrs if not addr['addr'].startswith('fe80')]
return ipv6_addr[0]
return None
class UDPSocketPool:
def __init__(self, ip_addr, port, num_sockets):
self.ipaddr = ip_addr
self.port = port
self.num_sockets = num_sockets
self.sockets = []
# Create a pool of sockets
for i in range(num_sockets):
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((ip_addr, port))
self.sockets.append(sock)
def sendto(self, data, address):
# Select a socket from the pool to send the data
sock = self.sockets[hash(address) % self.num_sockets]
sock.sendto(data, address)
def recvfrom(self):
rlist, _, _ = select.select(self.sockets, [], [])
response = []
for sock in rlist:
try:
data, server = sock.recvfrom(4096)
except BlockingIOError as e:
if e.errno == socket.errno.EWOULDBLOCK or e.errno == socket.errno.EAGAIN:
pass
else:
print(f"Socket error: {e}")
else:
r_time = time.monotonic()
receive_time = time.time()
serv_addr, serv_port, i, o = server
response.append((r_time, receive_time, data,
serv_addr, serv_port))
return response
def close(self):
# Close all sockets in the pool
for sock in self.sockets:
sock.close()
if __name__ == "__main__":
# Set up command line arguments
arg_parser = argparse.ArgumentParser(
description='client emulator',
usage='%(prog)s [ -d duration -c csv-output-file]')
arg_parser.add_argument('-d', dest='duration',
help='Connection duration of the client',
type=int,
default=300)
arg_parser.add_argument('-n', dest='hostname',
help='Hostname of the client',
type=str,
default=None)
arg_parser.add_argument('-c', dest='output',
help='File to write CSV output of RTT',
type=argparse.FileType('w'),
default=None)
args = arg_parser.parse_args()
if args.output is None:
raise SystemExit("No filename supplied for the output")
if args.hostname is None:
raise SystemExit("Host name not provided for client")
# Set up client socket
server_address = ('55::1', 12345)
server_address_p1 = ('55::4', 12346)
server_address_p2 = ('55::5', 12347)
client_addr = get_address(args.hostname)
print(f"IP address for host {args.hostname} is {client_addr}")
pool = UDPSocketPool(client_addr, 11111, 3)
message_counter = 0
# Set up variables for tracking consecutive counts and times
last_thirty_rtts = []
init_msm = False
# Set up CSV writer and write headers
with args.output as f:
writer = csv.writer(f, delimiter='|')
writer.writerow(["Message", "Receive Time", "RTT", "Received"])
# Loop through messages
start_time = time.monotonic()
end_time = start_time + args.duration
while time.monotonic() < end_time:
s_time = time.monotonic()
send_time = time.time()
message = f"{client_addr},{send_time},{message_counter}"
pool.sendto(message.encode(), server_address)
if init_msm is False:
pool.sendto(message.encode(), server_address_p1)
pool.sendto(message.encode(), server_address_p2)
init_msm = True
for resp in pool.recvfrom():
(r_time, receive_time, data, serv_addr, serv_port) = resp
if (serv_addr, serv_port) == server_address:
rtt = (r_time - s_time) * 1000
received = True
last_thirty_rtts.append(rtt)
# Write row to CSV
writer.writerow([message, receive_time, rtt, received])
message_counter += 1
if (serv_addr, serv_port) == server_address_p1:
#msm_msg = f"Pong,{client_addr},{serv_addr},{time.time()}"
pool.sendto(data, server_address_p1)
if (serv_addr, serv_port) == server_address_p2:
#msm_msg = f"Pong,{client_addr},{serv_addr},{time.time()}"
pool.sendto(data, server_address_p2)
if len(last_thirty_rtts) == 30:
p70val = np.percentile(last_thirty_rtts, 70)
if (p70val > 60.0):
print(f"High RTT experienced for 30% of traffic in the last 30 seconds")
break
last_thirty_rtts.clear()
t_delta = time.monotonic() - s_time
if t_delta < 1.0:
duration = 1.0 - t_delta
time.sleep(duration)
这是下面的服务器代码:
import asyncio
import sys
import time
from queue import Queue
from threading import Thread
from typing import List, Tuple, Dict
import os
from datetime import datetime
import concurrent.futures
class ServerProtocol(asyncio.DatagramProtocol):
def __init__(self, clients):
self.clients = clients
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
receive_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
client_addr = addr[0]
response = f"Received {message} at {receive_time}".encode()
self.transport.sendto(response, addr)
if client_addr not in self.clients:
self.clients[client_addr] = [(None, None), (None, None)]
class P1Protocol(asyncio.DatagramProtocol):
def __init__(self, clients, srcAddr):
self.alpha = 0.125
self.clients = clients
self.srcAddr = srcAddr
self.first = 0
self.message_counter = 0
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
msg_list = message.split(",")
client_addr = addr[0]
s_time = time.time()
if len(msg_list) == 3:
self.first = 1
else:
r_time = time.time()
pkt_s_time = float(msg_list[4])
rtt = (r_time - pkt_s_time) * 1000
ertt, rtt_prev = self.clients[client_addr][0]
if ertt is None:
ertt = rtt
self.clients[client_addr][0] = (ertt, rtt)
else:
ertt = ((1 - self.alpha) * ertt) + (self.alpha * rtt)
self.clients[client_addr][0] = (ertt, rtt)
ping = f"Ping,{client_addr},{self.srcAddr},{self.message_counter},{s_time}"
self.transport.sendto(ping.encode(), addr)
self.message_counter += 1
def send_packet(self, addr):
ping = f"Ping,{addr},{self.srcAddr},{time.monotonic()}"
self.transport.sendto(ping.encode(), (addr, 11111))
class P2Protocol(asyncio.DatagramProtocol):
def __init__(self, clients, srcAddr):
self.alpha = 0.125
self.clients = clients
self.srcAddr = srcAddr
self.first = 0
self.message_counter = 0
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
msg_list = message.split(",")
client_addr = addr[0]
s_time = time.time()
if len(msg_list) == 3:
self.first = 1
else:
r_time = time.time()
pkt_s_time = float(msg_list[4])
rtt = (r_time - pkt_s_time) * 1000
ertt, rtt_prev = self.clients[client_addr][1]
if ertt is None:
ertt = rtt
self.clients[client_addr][1] = (ertt, rtt)
else:
ertt = ((1 - self.alpha) * ertt) + (self.alpha * rtt)
self.clients[client_addr][1] = (ertt, rtt)
ping = f"Ping,{client_addr},{self.srcAddr},{self.message_counter},{s_time}"
self.transport.sendto(ping.encode(), addr)
self.message_counter += 1
def send_packet(self, addr):
ping = f"Ping,{addr},{self.srcAddr},{time.monotonic()}"
self.transport.sendto(ping.encode(), (addr, 11111))
async def Server(clients: Dict[str, list],
loop: asyncio.AbstractEventLoop) -> None:
transport, protocol = await loop.create_datagram_endpoint(
lambda: ServerProtocol(clients),
local_addr=('55::1', 12345))
try:
while True:
await asyncio.sleep(0.001)
except asyncio.CancelledError:
transport.close()
def ServerWorker(clients: Dict[str, list],
loop: asyncio.AbstractEventLoop) -> None:
loop.run_until_complete(Server(clients,loop))
async def ServerMsmWorker1(clients: Dict[str, list],
srcAddr: str,
port: int,
loop: asyncio.AbstractEventLoop) -> None:
P1transport, protocol = await loop.create_datagram_endpoint(
lambda: P1Protocol(clients, srcAddr),
local_addr=(srcAddr, port))
try:
while True:
await asyncio.sleep(0.001)
except asyncio.CancelledError:
P1transport.close()
async def ServerMsmWorker2(clients: Dict[str, list],
srcAddr: str,
port: int,
loop: asyncio.AbstractEventLoop) -> None:
P2transport, protocol = await loop.create_datagram_endpoint(
lambda: P2Protocol(clients, srcAddr),
local_addr=(srcAddr, port))
try:
while True:
await asyncio.sleep(0.001)
except asyncio.CancelledError:
P2transport.close()
def msmWorker1(clients: Dict[str, list],
srcAddr: str,
port: int) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(ServerMsmWorker1(clients, srcAddr, port, loop))
def msmWorker2(clients: Dict[str, list],
srcAddr: str,
port: int) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(ServerMsmWorker2(clients, srcAddr, port, loop))
async def main():
clients = {}
loop = asyncio.get_running_loop()
t1 = loop.create_task(Server(clients, loop))
t2 = loop.create_task(ServerMsmWorker1(clients, "55::4", 12346, loop))
t3 = loop.create_task(ServerMsmWorker2(clients, "55::5", 12347, loop))
await asyncio.gather(t1, t2, t3)
def tmain() -> None:
clients = {}
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
gameServer = Thread(target=ServerWorker,
args=(clients, loop),
daemon=True,
name=f"gameServer",)
gameServer.start()
P1Thread = Thread(target=msmWorker1,
args=(clients, "55::4", 12346),
daemon=True,
name=f"P1Thread",)
P2Thread = Thread(target=msmWorker2,
args=(clients, "55::5", 12347),
daemon=True,
name=f"P2Thread",)
P2Thread.start()
P1Thread.start()
while True:
time.sleep(1.0)
if __name__ == "__main__":
#tmain()
asyncio.run(main())
一个简单版本的服务器使用 asyncio 只有一个套接字如下所示 预期
import asyncio
import time
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
receive_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
port = addr[1]
response = f"Received {message} at {receive_time}".encode()
self.transport.sendto(response, addr)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
print("Starting the server")
listen = loop.create_datagram_endpoint(
EchoServerProtocol, local_addr=('55::1', 12345))
transport, protocol = loop.run_until_complete(listen)
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
print("server is shutting down.")
finally:
transport.close()
loop.close()
如果我能得到任何关于为什么 asyncio 和套接字没有按预期运行的指示,我将不胜感激。