**我正在开发一个涉及 Python 中 UDP 流的学校项目,其中客户端 (udp_stream.py) 向服务器 (udp_stream_server.py) 发送消息。客户端脚本设计为以特定速率(40/秒)发送指定数量(800)条消息。但是,我在流处理过程中遇到了意外的延迟,并且实际执行时间比预期要长。
我已检查代码是否存在潜在瓶颈并调整了睡眠时间,但问题仍然存在。服务器脚本只是打印传入消息及其相应的客户端地址。
任何有关 UDP 流传输可能比预期慢的原因的见解将不胜感激。我特别寻求有关优化 Python UDP 客户端以获得一致性能的指导。
谢谢! **
`udp_stream.py
from socket import *
import time
def returnTime():
t = time.localtime()
current_time = time.strftime("%H:%M:%S", t)
milliseconds = int((time.time() % 1) * 1000) # Extract milliseconds
return f"{current_time}.{milliseconds:03d}"
def log():
return f"[CLIENT:] {returnTime()}:"
def udp_stream(target_ip, target_port, message_total, message_rate):
client_socket = socket(AF_INET, SOCK_DGRAM)
message_number = 10001
message_size=1470
sent_messages = 0
first_sent = log();
for x in range(message_total):
sent_messages += 1
if sent_messages < message_total:
message = f"{message_number};{'A'*(message_size - len(str(message_number)) -1)}"
client_socket.sendto(message.encode(), (target_ip, target_port))
time.sleep(1 / message_rate)
message_number = message_number + 1
elif sent_messages == message_total:
message = f"{message_number};{'A' * (message_size - len(str(message_number)) - 5)}####"
client_socket.sendto(message.encode(), (target_ip, target_port))
message_number = message_number + 1
time.sleep(1 / message_rate)
last_sent = log()
print(first_sent, last_sent)
udp_stream('localhost',8080,800,40)`
`udp_stream_server.py
import time
def returnTime():
t = time.localtime()
current_time = time.strftime("%H:%M:%S", t)
milliseconds = int((time.time() % 1) * 1000) # Extract milliseconds
return f"{current_time}.{milliseconds:03d}"
from socket import *
serverPort = 8080
serverSocket = socket(AF_INET, SOCK_DGRAM)
serverSocket.bind(('', serverPort))
print(f'[SERVER: {returnTime()}]: UDP Server has started listening on port: {serverPort}')
while True:
# read client's message and remember client's address (IP and port)
message, clientAddress = serverSocket.recvfrom(1024)
# Print message and client address
print (f"[SERVER: {returnTime()}]: Message from client: ",message.decode())
print (f"[SERVER: {returnTime()}]: Client-IP: ",clientAddress)`
检查代码中是否存在潜在瓶颈。 调整睡眠时间以确保所需的消息速率。 检查了服务器脚本 (udp_stream_server.py),它只是打印传入消息和客户端地址。
如评论中所述,计算发送每条消息的绝对时间并睡眠直到该时间。通过这种方式,操作系统线程调度和处理每条消息的变化可以得到平均。
下面是一个服务器的实现,它接收预期测试参数的消息,然后接收并记录每条消息的时间,以便以后计算统计数据。请注意,如果客户端提前中止或由于 UDP 不可靠并且可能会丢弃或乱序数据包,则会实施 sync 数据包以确保服务器已准备好进行下一次测试并且不会需要重新启动。
import socket
import time
serverPort = 8080
serverSocket = socket.socket(type=socket.SOCK_DGRAM)
serverSocket.bind(('', serverPort))
print(f'UDP Server has started listening on port: {serverPort}')
def receive_header(serverSocket):
while True:
try:
message_total, message_rate = [int(n) for n in serverSocket.recvfrom(1024)[0].split()]
break
except ValueError: # received on a sync
pass
print(f'Receiving {message_total} messages @ {message_rate} messages/second...')
print(f'Expected time {message_total/message_rate:.6f} seconds at intervals of {1/message_rate:.6f} seconds.')
return message_total, message_rate
while True:
message_total, message_rate = receive_header(serverSocket)
times = [time.perf_counter()] # Start time.
for _ in range(message_total):
message, clientAddress = serverSocket.recvfrom(1024)
if message == b'sync':
break # ends "for" and skips "else:"
times.append(time.perf_counter())
else:
total = times[-1] - times[0]
intervals = [times[n+1] - times[n] for n in range(len(times) - 1)]
ave = sum(intervals) / len(intervals)
max_interval = max(intervals)
min_interval = min(intervals)
print(f' Total Time {total:9.6f}')
print(f' Minimum Interval {min_interval:9.6f}')
print(f' Average Interval {ave:9.6f}')
print(f' Maximum Interval {max_interval:9.6f}')
下面是该服务器的客户端,它发送测试要求并根据消息速率在特定的绝对时间发送消息:
import socket
import time
import sys
def send_header(client_socket, target_ip, target_port, message_total, message_rate):
client_socket.sendto(b'sync', (target_ip, target_port)) # make ready to receive header
client_socket.sendto(f'{message_total} {message_rate}'.encode(), (target_ip, target_port))
def udp_stream(target_ip, target_port, message_total, message_rate):
client_socket = socket.socket(type=socket.SOCK_DGRAM)
send_header(client_socket, target_ip, target_port, message_total, message_rate)
message = b"size shouldn't matter"
start = time.perf_counter()
for n in range(message_total):
next_send = start + (n + 1) / message_rate # time to send next message
sleep_time = next_send - time.perf_counter()
if sleep_time < 0: # if time already passed, log and skip sleep
print('!', end='', flush=True)
else:
time.sleep(sleep_time)
client_socket.sendto(message, (target_ip, target_port))
print(f'\ntotal time {time.perf_counter() - start}')
rate = int(sys.argv[1])
udp_stream('localhost', 8080, 10 * rate, rate)
费率 1、10、40、1000、3000 的运行示例:
UDP Server has started listening on port: 8080
Receiving 10 messages @ 1 messages/second...
Expected time 10.000000 seconds at intervals of 1.000000 seconds.
Total Time 10.000234
Minimum Interval 0.999843
Average Interval 1.000023
Maximum Interval 1.000573
Receiving 100 messages @ 10 messages/second...
Expected time 10.000000 seconds at intervals of 0.100000 seconds.
Total Time 10.000396
Minimum Interval 0.099620
Average Interval 0.100004
Maximum Interval 0.100455
Receiving 400 messages @ 40 messages/second...
Expected time 10.000000 seconds at intervals of 0.025000 seconds.
Total Time 10.000506
Minimum Interval 0.024607
Average Interval 0.025001
Maximum Interval 0.025614
Receiving 10000 messages @ 1000 messages/second...
Expected time 10.000000 seconds at intervals of 0.001000 seconds.
Total Time 10.000564
Minimum Interval 0.000390
Average Interval 0.001000
Maximum Interval 0.001461
Receiving 30000 messages @ 3000 messages/second...
Expected time 10.000000 seconds at intervals of 0.000333 seconds.
Total Time 10.698635
Minimum Interval 0.000308
Average Interval 0.000357
Maximum Interval 0.003552
在我的系统上,在预期的绝对时间 (
sleep_time < 0
) 之后仅发送了 1 个速率为 1000 的数据包,但最小/最大间隔的分布较大。在速率 3000 时,系统无法维持该速率,发送 3000 条消息大约需要 10.7 秒,而不是预期的 10 秒。