pyzmq代理在订阅多个进程后处于奇怪的状态

问题描述 投票:1回答:1

我在pyzmq中遇到了一个奇怪的问题。这是该代理的代码:

import zmq
context = zmq.Context.instance()

frontend_socket = context.socket(zmq.XSUB)
frontend_socket.bind("tcp://0.0.0.0:%s" % sub_port)

backend_socket = context.socket(zmq.XPUB)
backend_socket.bind("tcp://0.0.0.0:%s" % pub_port)

zmq.proxy(frontend_socket, backend_socket)

我正在使用该代理在6台不同计算机上运行的约50个进程之间发送消息。主题的总数约为1,000,但是由于多个进程可以收听相同的主题,因此订阅的总数约为10,000。

在正常情况下,这种方法非常有效,只要一个进程发布代理并且至少有一个其他进程订阅了该主题,邮件就可以正确地通过代理。无论发布者或订阅者是首先启动的,它都有效。

但是在某个时间点,当我们开始一个新进程(我们称它为X)时,它开始表现出奇怪的状态。已经连接的所有内容都可以正常工作,但是如果发布者在订阅者之前连接,则我们连接的新流程只能使消息通过。 X可以是正常运行的任何进程,也可以来自任何计算机,并且结果相同。当我们进入这种状态时,杀死X会使一切重新工作,再次启动将使其失败。如果我们停止其他进程然后再启动X,它会很好地工作(因此,它与X的代码无关)。

我不确定我们是否可以达到ZMQ的某些限制?我读过一些人的例子,这些人似乎比我们拥有更多的流程,订阅等。这可能是我们应该在代理服务器上设置的某些选项,到目前为止,这里是我们尝试过但未成功的选项:

  • 在frontend_socket上更改RCVHWM
  • 更改SNDHWM的后端套接字
  • 在后端套接字上设置XPUB_VERBOSE
  • 在后端套接字上设置XPUB_VERBOSER

这里是我们如何将消息发布到代理的示例代码:

topic = "test"
message = {"test": "test"}

context = zmq.Context.instance()
socket = context.socket(zmq.PUB)
socket.connect("tcp://1.2.3.4:1234")
while True:
    time.sleep(1)
    socket.send_multipart([topic.encode(), json.dumps(message).encode()])

这是我们如何从代理订阅消息的示例代码:

topic = "test"
context = zmq.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://1.2.3.4:5678")
socket.subscribe(topic)

while True:
    multi_part = socket.recv_multipart()
    [topic, message] = multi_part
    print(topic.decode(), message.decode())

有人见过类似的问题吗?我们可以做些什么来避免代理进入这种状态?

谢谢!

python proxy zeromq pyzmq
1个回答
0
投票

使所有发布者(代理和发布过程)XPUB(+ sockopt详细/详细信息),然后在轮询循环中从发布者套接字读取。订阅消息的第一个字节将告诉您消息是sub / unsub,然后是主题/主题。如果您用时间戳记记录了所有这些信息,它应该告诉您哪个组件有故障(可能是三个组件中的任何一个),并提供修复帮助。

到达发布者(XPUB)的订阅消息的格式为

  • 订阅[0x01][topic]
  • 取消订阅[0x00][topic]
© www.soinside.com 2019 - 2024. All rights reserved.