我有一个Raspberry Pi客户端和4个Raspberry Pi服务器。我希望客户端同时向所有4个服务器发送字符串消息以捕获图像。现在,我正在按顺序使用以下内容。
socket.send(capture)
socket1.send(capture)
socket2.send(capture)
socket3.send(capture)
将更改为类似发布/订阅模型会改善客户端接收消息的程度吗?我希望这4个客户端彼此之间在5毫秒或更短的时间内获取捕获消息。
虽然我们获得零保修,但我们可能会在适当的方向上执行一些步骤。如果是ZeroMQ的新手,在深入了解更多细节之前,请随时阅读posts here和至少“ ZeroMQ Principles in less than Five Seconds”:
import zmq; print( zmq.zmq_version() ) # INF:
aCtx = zmq.Context( 4 ) # request 4-I/O-threads
aPUB = aCtx.socket( zmq.PUB ) # PUB-instance
aPUB.setsockopt( zmq.LINGER, 0 ) # avoid deadlock on close
aPUB.setsockopt( zmq.SNDBUF, 3 * PayLoadSIZE ) # FullHD ~ 6 MB, 4K ~ ...
aPUB.setsockopt( zmq.SNDHWM, aNumOfPicsInQUEUE ) # 1, ~3? ~10?, !1000 ...
aPUB.setsockopt( zmq.IMMEDIATE, 1 ) # ignore L1/L2-incomplete(s)
aPUB.setsockopt( zmq.CONFLATE, 1 ) # do not re-send "old"
aPUB.bind( <transport-class>:<port#> ) # tcp:? udp-multicast?
#-----------------------------------------------------------------------------[RTO]
# may like to set aPayLOAD = gzip.compress( dill.dumps( capture ), compressionLEVEL )
# yields reduced sizes of the serialised <capture> data
# at costs of about ~30~60 [ms] on either side
# which may lower the network traffic and .SNDBUF-sizing issues
#----------------------------------------------------------------------
while <any reason>:
try:
aPUB.send( aPayLOAD, zmq.NOBLOCK )
except:
# handle as per errno ...
finally:
pass
#----------------------------------------------------------------------
aPUB.close()
aCtx.term()
import zmq; print( zmq.zmq_version() ) # INF:
aCtx = zmq.Context() # request 4-I/O-threads
aSUB = aCtx.socket( zmq.SUB ) # SUB-instance
aSUB.setsockopt( zmq.LINGER, 0 ) # avoid deadlock on close
aSUB.setsockopt( zmq.RCVBUF, 3 * PayLoadSIZE ) # FullHD ~ 6 MB, 4K ~ ...
aSUB.setsockopt( zmq.RCVHWM, aNumOfPicsInQUEUE ) # 1, ~3? ~10?, !1000 ...
aSUB.setsockopt( zmq.IMMEDIATE, 1 ) # ignore L1/L2-incomplete(s)
aSUB.setsockopt( zmq.CONFLATE, 1 ) # do not re-recv "old"
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # do subscribe to whatever comes
aSUB.connect( <transport-class>:<port#> ) # tcp:? udp-multicast?
#-----------------------------------------------------------------------------[RTO]
while <any reason>:
try:
if ( aSUB.poll( zmq.POLLIN, 0 ) == 0 ):
# nothing in the receiving Queue ready-to-.recv()
# sleep()
# do some system work etc
else:
aPayLOAD = aSUB.recv( zmq.NOBLOCK )
#--------------------------------------------------------
# decompress / deserialise the original object
# capture = dill.loads( gzip.decompress( aPayLOAD ) )
#--------------------------------------------------------
# PROCESS THE DATA :
# ...
except:
# handle as per errno ...
finally:
pass
#----------------------------------------------------------------------
aSUB.close()
aCtx.term()