ZMQ PUB/SUB asyncio 没有收到任何消息

问题描述 投票:0回答:1

我正在做一个 pub/sub zmq 接收器,一个 pub,多个 sub,它可以在异步环境中从一个发布者接收消息。 基于异步的接收器不接收任何消息。

接收器程序的片段:

async def zmq_listener(sub, stop_event):
    global mode
    while not stop_event.is_set():
        msg = await sub.recv_multipart()
        print(msg)
        if len(msg) == 1:
            print(msg[0].decode())

######################################
### MAIN PROGRAM STARTS HERE 
async def main():
    tasks = []
    tasks.append(asyncio.create_task(other_routine1(abc, stop_event)))

    if doZMQ:
        print("ZMQ Mode is enabled")
        ctx = zmq.asyncio.Context()
        sub = ctx.socket(zmq.SUB)
        ip = 'tcp://127.0.0.1:5559'
        sub.connect(ip)
        sub.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics        
        tasks.append(asyncio.create_task(zmq_listener(sub, stop_event)))

    await asyncio.gather(*tasks)

stop_event = asyncio.Event()

try:
    uvloop.install()
    asyncio.run(main(), debug=False)

except KeyboardInterrupt:
    print("Program interrupted by user")
    asyncio.run(stop())

测试接收器的发送应用程序如下:

import zmq

ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
ip = 'tcp://127.0.0.1:5559'
pub.bind(ip)

pub.send_multipart([b"SHOW"])
pub.send_multipart([b"P1"])
python python-asyncio pyzmq
1个回答
0
投票

发布者在订阅者有机会连接和订阅消息之前发送消息,让我们尝试添加一个短暂的延迟,比如 1 秒。

import zmq
import time

ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
ip = 'tcp://127.0.0.1:5559'
pub.bind(ip)

time.sleep(1)

pub.send_multipart([b"SHOW"])
pub.send_multipart([b"P1"])
© www.soinside.com 2019 - 2024. All rights reserved.