pyzmq同时发送到多个客户端

问题描述 投票:0回答:1

我有一个Raspberry Pi客户端和4个Raspberry Pi服务器。我希望客户端同时向所有4个服务器发送字符串消息以捕获图像。现在,我正在按顺序使用以下内容。

socket.send(capture)
socket1.send(capture)
socket2.send(capture)
socket3.send(capture)

将更改为类似发布/订阅模型会改善客户端接收消息的程度吗?我希望这4个客户端彼此之间在5毫秒或更短的时间内获取捕获消息。

python networking raspberry-pi pyzmq
1个回答
1
投票

欢迎您来到零之禅:

虽然我们获得零保修,但我们可能会在适当的方向上执行一些步骤。如果是ZeroMQ的新手,在深入了解更多细节之前,请随时阅读posts here和至少“ ZeroMQ Principles in less than Five Seconds”:

客户代码概念模板:

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context( 4 )                                # request 4-I/O-threads
   aPUB = aCtx.socket( zmq.PUB )                          # PUB-instance
   aPUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aPUB.setsockopt(    zmq.SNDBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aPUB.setsockopt(    zmq.SNDHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aPUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aPUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-send "old"
   aPUB.bind( <transport-class>:<port#> )                 # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   # may like to set aPayLOAD = gzip.compress( dill.dumps( capture ), compressionLEVEL )
   #                 yields reduced sizes of the serialised <capture> data
   #                 at costs of about ~30~60 [ms] on either side
   #                 which may lower the network traffic and .SNDBUF-sizing issues
   #----------------------------------------------------------------------
   while <any reason>:
         try:
              aPUB.send( aPayLOAD, zmq.NOBLOCK )
         except:
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aPUB.close()
   aCtx.term()

服务器代码概念模板:

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context()                                   # request 4-I/O-threads
   aSUB = aCtx.socket( zmq.SUB )                          # SUB-instance
   aSUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aSUB.setsockopt(    zmq.RCVBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aSUB.setsockopt(    zmq.RCVHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aSUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aSUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-recv "old"
   aSUB.setsockopt(    zmq.SUBSCRIBE, "" )                # do subscribe to whatever comes
   aSUB.connect( <transport-class>:<port#> )              # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   while <any reason>:
         try:
              if ( aSUB.poll( zmq.POLLIN, 0 ) == 0 ):
                 # nothing in the receiving Queue ready-to-.recv()
                 # sleep()
                 # do some system work etc
              else:
                 aPayLOAD = aSUB.recv( zmq.NOBLOCK )
                 #--------------------------------------------------------
                 # decompress / deserialise the original object
                 # capture = dill.loads( gzip.decompress( aPayLOAD ) )
                 #--------------------------------------------------------
                 # PROCESS THE DATA :
                 # ...
         except: 
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aSUB.close()
   aCtx.term()
© www.soinside.com 2019 - 2024. All rights reserved.