我正在做一个 pub/sub zmq 接收器,一个 pub,多个 sub,它可以在异步环境中从一个发布者接收消息。 基于异步的接收器不接收任何消息。
接收器程序的片段:
async def zmq_listener(sub, stop_event):
global mode
while not stop_event.is_set():
msg = await sub.recv_multipart()
print(msg)
if len(msg) == 1:
print(msg[0].decode())
######################################
### MAIN PROGRAM STARTS HERE
async def main():
tasks = []
tasks.append(asyncio.create_task(other_routine1(abc, stop_event)))
if doZMQ:
print("ZMQ Mode is enabled")
ctx = zmq.asyncio.Context()
sub = ctx.socket(zmq.SUB)
ip = 'tcp://127.0.0.1:5559'
sub.connect(ip)
sub.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics
tasks.append(asyncio.create_task(zmq_listener(sub, stop_event)))
await asyncio.gather(*tasks)
stop_event = asyncio.Event()
try:
uvloop.install()
asyncio.run(main(), debug=False)
except KeyboardInterrupt:
print("Program interrupted by user")
asyncio.run(stop())
测试接收器的发送应用程序如下:
import zmq
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
ip = 'tcp://127.0.0.1:5559'
pub.bind(ip)
pub.send_multipart([b"SHOW"])
pub.send_multipart([b"P1"])
发布者在订阅者有机会连接和订阅消息之前发送消息,让我们尝试添加一个短暂的延迟,比如 1 秒。
import zmq
import time
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
ip = 'tcp://127.0.0.1:5559'
pub.bind(ip)
time.sleep(1)
pub.send_multipart([b"SHOW"])
pub.send_multipart([b"P1"])