ZeroMQ:推/拉时延迟非常高

问题描述 投票:0回答:1

摘要

[我看到使用zmq.PUSH/PULL套接字的接收器的等待时间非常长。

详细信息

我是ZeroMQ的新手,正在尝试从发送方向接收方发送几兆字节(我的最终目标是从发送方向接收方发送摄像机流)。发送者和接收者是同一本地网络上的不同机器(如果重要的话,一台是Mac,另一台正在运行Ubuntu 18.04)。发送方能够非常快速地发送其数据包(我想它们会被缓存在某些zmq / tcp队列中),但是接收方会非常缓慢地接收它们,并且每个数据包的延迟都会增加。

这里是发件人:

  1 """
  2 Sender
  3 """
  4 import zmq
  5 from time import time
  6 import sys
  7 
  8 addr = "tcp://192.168.86.33:5555"
  9 
 10 context = zmq.Context()
 11 socket = context.socket(zmq.PUSH)
 12 socket.bind(addr)
 13 
 14 num = 0
 15 while True:
 16     frame = [0] * 1000000
 17     ts = time()
 18     num += 1
 19     socket.send_pyobj(dict(num=num, frame=frame, ts=ts))
 20 
 21     delay = time() - ts
 22     print("Sender:: pkt_num: {}, delay: {}".format(num, delay))

和接收者:

  1 """                                                                              
  2 Receiver                                                                         
  3 """                                                                              
  4 import zmq                                                                       
  5 from time import time                                                            
  6                                                                                  
  7 addr = "tcp://192.168.86.33:5555"                                                
  8 context = zmq.Context()                                                          
  9                                                                                  
 10 socket = context.socket(zmq.PULL)                                                
 11 socket.connect(addr)                                                             
 12                                                                                  
 13 while True:                                                                      
 14     msg = socket.recv_pyobj()                                                    
 15     frame = msg['frame']                                                         
 16     num = msg['num']                                                             
 17     ts = msg['ts']                                                               
 18                                                                                  
 19     delay = time() - ts                                                          
 20                                                                                  
 21     if True:                                                                     
 22         print("Receiver:: pkt_num: {} latency: {}".format(num, delay))             

当我运行此命令时,我看到发送方能够非常快速地发送其数据包:

Sender:: pkt_num: 1, delay: 0.026965618133544922
Sender:: pkt_num: 2, delay: 0.018309354782104492
Sender:: pkt_num: 3, delay: 0.01821303367614746
Sender:: pkt_num: 4, delay: 0.016669273376464844
Sender:: pkt_num: 5, delay: 0.01674652099609375
Sender:: pkt_num: 6, delay: 0.01668095588684082
Sender:: pkt_num: 7, delay: 0.015082836151123047
Sender:: pkt_num: 8, delay: 0.014363527297973633
Sender:: pkt_num: 9, delay: 0.014063835144042969
Sender:: pkt_num: 10, delay: 0.014398813247680664

但是接收者看到很高的分组等待时间:

Receiver:: pkt_num: 1 latency: 0.1272585391998291
Receiver:: pkt_num: 2 latency: 0.2539491653442383
Receiver:: pkt_num: 3 latency: 0.40800905227661133
Receiver:: pkt_num: 4 latency: 0.5737316608428955
Receiver:: pkt_num: 5 latency: 0.7272651195526123
Receiver:: pkt_num: 6 latency: 0.9418754577636719
Receiver:: pkt_num: 7 latency: 1.0799565315246582
Receiver:: pkt_num: 8 latency: 1.228663682937622
Receiver:: pkt_num: 9 latency: 1.3731486797332764
Receiver:: pkt_num: 10 latency: 1.5067603588104248

我尝试在Mac和Linux机器之间交换发送者和接收者,并且看到了相同的行为。由于我的目标是将视频流从发送方发送到接收方,因此这些高延迟使它无法用于此目的。


编辑1

基于user3666197的建议,我编辑了发送方/接收方测试代码以消除一些开销。在发送方,请继续发送相同的字典。我还添加了更多照片。

发件人:

 14 num = 0
 15 frame = [0] * 1000000
 16 payload = dict(num=0, frame=frame, ts=0.0)
 17 while True:
 18     payload['num'] += 1
 19     payload['ts'] = time()
 20     socket.send_pyobj(payload)
 21 
 22     delay = time() - payload['ts']
 23     print("Sender:: pkt_num: {:>6d}, delay: {:6f}" \
 24           .format(payload['num'], delay))

接收器

 10 socket = context.socket(zmq.PULL)
 11 socket.connect(addr)
 12 clk = zmq.Stopwatch()
 13 clk.start()
 14 
 15 while True:
 16     iterT = clk.stop()
 17     clk.start()
 18     msg = socket.recv_pyobj()
 19     rcvT = clk.stop()
 20     delay = time() - msg['ts']
 21 
 22     print("Server:: pkt_num: {:>6d} latency: {:>6f} iterT: {} rcvT: {}" \
 23          .format(msg['num'], delay, iterT, rcvT))
 24     clk.start()

发件人的每个数据包延迟进一步降低。揭示的一个有趣的数据点是,接收器几乎需要0.15s来接收每个数据包,这似乎是主要问题。

Sender:: pkt_num:      1, delay: 1.797830
Sender:: pkt_num:      2, delay: 0.025297
Sender:: pkt_num:      3, delay: 0.019500
Sender:: pkt_num:      4, delay: 0.019500
Sender:: pkt_num:      5, delay: 0.018166
Sender:: pkt_num:      6, delay: 0.017320
Sender:: pkt_num:      7, delay: 0.017258
Sender:: pkt_num:      8, delay: 0.017277
Sender:: pkt_num:      9, delay: 0.017426
Sender:: pkt_num:     10, delay: 0.017340

在接收方的打印中,rcvT是每数据包的接收时间,以微秒为单位。

Server:: pkt_num:      1 latency: 2.395570 iterT: 1 rcvT: 331601
Server:: pkt_num:      2 latency: 0.735229 iterT: 1 rcvT: 137547
Server:: pkt_num:      3 latency: 0.844345 iterT: 1 rcvT: 134385
Server:: pkt_num:      4 latency: 0.991852 iterT: 1 rcvT: 166980
Server:: pkt_num:      5 latency: 1.089429 iterT: 2 rcvT: 117047
Server:: pkt_num:      6 latency: 1.190770 iterT: 2 rcvT: 119466
Server:: pkt_num:      7 latency: 1.348077 iterT: 2 rcvT: 174566
Server:: pkt_num:      8 latency: 1.460732 iterT: 1 rcvT: 129858
Server:: pkt_num:      9 latency: 1.585445 iterT: 2 rcvT: 141948
Server:: pkt_num:     10 latency: 1.717757 iterT: 1 rcvT: 149666
zeromq pyzmq
1个回答
0
投票

Q“ ZeroMQ:PUSH/PULL的延迟很高]]]

不是那么快,不是那么快:


如果从未使用过ZeroMQ,在这里您可以先研究"ZeroMQ Principles in less than Five Seconds",然后再深入研究更多细节]]


The ZeroMQ PUSH/PULL原型是故事的一部分。让我们分解声称的延迟:

1]

我们来衡量一下实际的有效载荷组装成本[us]
aClk = zmq.Stopwatch()
num  = 0    
while True:
    aClk.start() #--------------------------------------- .start() the microsecond timer
    frame     = [0] * 1000000
    ts        = time()
    num      += 1
    aPayLOAD  = dict( num   = num,
                      frame = frame,
                      ts    = ts
                      )
    _1 = aClk.stop() #---------------------------------- .stop() the microsecond timer
    aPayLOAD['ts'] = time()

    aClk.start() #-------------------------------------- .start() the microsecond timer
    socket.send_pyobj( aPayLOAD )
    _2 = aClk.stop() #---------------------------------- .stop() the microsecond timer
    delay        = time() - aPayLOAD['ts']

    print( "Sender:: pkt_num: {0:>6d}, assy: {1:>6d} [us] .send_pyobj(): {2:>6d} [us] 'delay': {3:>10.6f} [s] ".format( num, _1, _2, delay ) )

2]

在接收端做同样的事情,您可以清楚地看到在ZeroMQ接触有效负载的第一个字节之前,在python端消耗了多少[us]

3]接下来是性能调整:

  • 避免python垃圾回收
  • 改善浪费的内存管理(避免创建新实例,更好地将数据注入内存效率高的“处理程序”([numpy.ndarray.data-区域,而不是将重新组成的[[list
  • -s分配给一个python变量)重构代码,以避免昂贵的基于
  • dict
  • 的键-值映射,而是使用紧凑的二进制映射,如struct-模块中可用的(建议的映射既是静态的又是微不足道的) 测试添加压缩步骤(可能会稍微改善网络跳延迟)
  • 4]

  • 现在只有ZeroMQ部分来了:
    可以通过将更多的IO线程添加到
  • Context( nIO_threads )
-instances中的任何一个中来测试改进的性能信封可能会加入
  • .setsockopt( zmq.CONFLATE, 1 )
  • 来忽略所有非“最后”帧,因为视频流可能无法从已经“旧”帧的“后期”重新创建中获得任何附加值]]
    © www.soinside.com 2019 - 2024. All rights reserved.