ZMQ-python:一对一的推/拉

问题描述 投票:1回答:1

我正在尝试使用以下代码对zmq进行尝试,但是订阅者将对象一个接一个地获取。

以下是我的PUSH脚本:

# zmq server -- run it once

import zmq
import time
# server
# print(zmq.Context)
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind('ipc:///tmp/zmqtest')
i=0
while True:
    i+=1
    time.sleep(0.5)
    sock.send_pyobj((i))

以下是PULL脚本:

# zmq client -- run it 2,3 times in parallel

import zmq
ctx = zmq.Context() # create a new context to kick the wheels
sock = ctx.socket(zmq.PULL)
sock.connect('ipc:///tmp/zmqtest')

i=0
while True:
    i+=1
    o = sock.recv_pyobj()
    print('received python object:', o,i)
    if o == 'quit':
        print('exiting.')
        break

我从PULL脚本之一获得以下输出:

received python object: 1 1
received python object: 3 2
received python object: 5 3
received python object: 7 4

如何将对象同时推送到两个脚本?我尝试了PUB / SUB,但这种方式无法正常工作。 (可以检查将PUSH/PULL替换为PUB/SUB

one-to-many zeromq publish-subscribe pyzmq
1个回答
0
投票

[PUB- side:

# zmq PUB-server -- run it once

import zmq
import time

IPC  = 'ipc:///tmp/zmqtest'
ctx  = zmq.Context()
PUB  = ctx.socket( zmq.PUB )
PUB.bind( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
PUB.setsockopt( zmq.LINGER, 0 )
PUB.setsockopt( zmq...        )
#------------------------------------------------------------------------------
i = 0
while True:
    i += 1
    time.sleep( 0.5 )
    sock.send_pyobj( ( i ) )
#------------------------------------------------------------------------------

[SUB-侧(s):

# zmq SUB-client -- run x-times concurrently ( or distributed, if other TransportClasses permit )

import zmq

IPC = 'ipc:///tmp/zmqtest'        # <TransportClass>://<address>, TCP,TIPC,...may follow
ctx = zmq.Context()               # create a new context to kick the wheels
SUB = ctx.socket( zmq.SUB )
SUB.connect( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
SUB.setsockopt( zmq.LINGER,     0 )
SUB.setsockopt( zmq.SUBSCRIBE, "" )
SUB.setsockopt( zmq...            )
#------------------------------------------------------------------------------
i    = 0
aClk = zmq.Stopwatch()
MASK = '(i:{1:_>9d}): After{2:_>+12d} [us] did .recv() a python object:[{0:}]'
while True:
    i += 1
    aClk.start()
    o = sock.recv_pyobj()
    _ = aClk.stop()
    print( MASK.format( repr( o ), i, _ ) )

    if o == 'quit':
        print( 'Will exit.' )
        #--------------------------------------- BE NICE & FAIR TO RESOURCES
        SUB.setsockopt( zmq.UNSUBSCRIBE, "" )
        SUB.disconnect( IPC )
        SUB.close()
        ctx.term()
        #-------------------------------------------------------------------
        break

“是的。我需要将每个对象都发送到两个(或多个)脚本无损失

警告,对此实行零保修。一个人可以构建自己的应用程序级协议来实现这一目标。

© www.soinside.com 2019 - 2024. All rights reserved.