我有一个具有 2 个以太网接口的自定义网络设备。它可以在任一接口上接收 UDP 消息,也可以在两个接口上接收自定义实验协议。一旦它收到消息,它就会处理它们,并将命令/结果转发到另一个接口(在我的例子中它就像防火墙,但我想将讨论保留到一般的网络设备)。
我想测量设备的吞吐量和延迟,因此我将两根以太网电缆从设备连接到 Linux RHEL 8 系统。我创建了一个在linux系统上运行的python 3应用程序,它有3个进程1)1在一个接口上发送数据,2)1在另一个接口上接收处理后的数据,3)1在相反方向发送虚拟数据这样就可以在双向数据负载下进行测量
我正在使用 python 多处理库,并使用套接字库中的 linux udp 套接字以及自定义协议的原始套接字(由于自定义协议,iperf 3 在我的情况下不起作用)。我将如何测量吞吐量和延迟,即批量创建一堆测试数据包,并在从进程 1) 发出每个测试数据包后为其添加时间戳。当进程 1) 发送数据包时,进程 2) 正在接收另一端的数据包并为其添加时间戳。接收到所有批量数据后,它会计算相邻接收数据包之间的差异以获得吞吐量测量值,并计算发送和接收之间的差异以获得延迟测量值。
该问题与多处理/套接字相互作用有关。在机器上,当我从进程 1 调用 sock.sendto(data, (dest, port)) 并从进程 2 调用 sock2.receivefrom(data) 时,它们似乎没有同时运行。看起来进程 1) 发送了所有数据包(我正在尝试使用大小为 1000 字节的数据包,300 个数据包),然后进程 2) 开始接收。我从wireshark跟踪中知道,接收到的数据包在开始处理它们之前实际上已到达linux系统。即使是陌生人进程 3) 也会在进程 2) 发送单个数据包之前发送所有数据包。更糟糕的是,如果数据包总字节数超过 200000,进程 1) 有时会丢失数据包。
下面是我正在做的一些伪代码
import multiprocessing
import time
import socket
def sender(socket, queue, data_list, dest):
timestamps = []
for data in data_list:
socket.sendto(data, dest)
timestamps.append(time.perf_counter())
queue.put(timestamps)
def sender_dummy(socket, queue, data_list, dest):
for data in data_list:
socket.sendto(data, dest)
def main():
## Setup the sockets with appropriate timeouts and interfaces and protocols
....
## Setup the test data
....
## Begin multi-processing
q = multiprocessing.Queue()
task_main = multiprocessing.Process(target=sender, args=(sender_sock, q, test_data, (dest_addr, dest_port)))
task_dummy = multiprocessing.Process(target=sender_dummy, args=(sender_dummy_sock, q, dummy_data, (dest_addr, dest_port)))
timestamps_recv = []
task_dummy.start()
task_main.start()
for i in range(len(test_data)):
recv_sock.receivefrom(TEST_DATA_SIZE)
timestamps_recv.append(time.perf_counter())
task_dummy.join()
task_main.join()
timestamps_send = q.get()
# Calculate latency and throughput
...
任何有关解决丢包问题以及获得发送者和接收者真正并发性的帮助都是值得赞赏的。
您可以尝试以下几件事:
确保套接字不跨进程共享。共享可能会导致不可预测的行为。分叉后,在每个进程中创建单独的套接字实例。
由于同步问题,进程可能会相互阻塞。尝试为每个发送进程使用单独的队列。另外,确保接收者进程一启动就主动监听,而不是在发送者之后同步启动。
尝试使用
socket.setsockopt()
调整套接字缓冲区大小
根据外部措施(可能是 Wireshark)交叉检查时间戳,以确保它们准确地表示网络事件
GIL 可能会影响性能。每个进程都使用自己的 GIL 运行,这会导致处理 I/O 操作时产生开销。
我会考虑以下一般性改变:
对 I/O 密集型任务使用线程,因为线程共享相同的内存空间,并且与进程相比重量更轻。
查看
asyncio
库,它非常适合处理 I/O 操作,并且可以提供更有效的方法来管理并发操作。
这里对异步开始接收进行了调整:
def receiver(recv_sock):
while True:
data, addr = recv_sock.recvfrom(TEST_DATA_SIZE)
if not data:
break
timestamps_recv.append(time.perf_counter())
task_recv = multiprocessing.Process(target=receiver, args=(recv_sock,))
task_recv.start()
希望这有帮助。