当我阅读 zmq 指南中的“持久订阅者和高水位标记”时,它说“HWM 导致 ØMQ 丢弃它无法放入队列的消息”,但是当我运行该示例时没有消息丢失。按 ctrl+c 终止 durasub.py,然后继续。
durasub.py
import zmq
import time
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.IDENTITY, "Hello")
subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect("tcp://localhost:5565")
sync = context.socket(zmq.PUSH)
sync.connect("tcp://localhost:5564")
sync.send("")
while True:
data = subscriber.recv()
print data
if data == "END":
break
durapub.py
import zmq
import time
context = zmq.Context()
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")
publisher.setsockopt(zmq.HWM, 2)
sync_request = sync.recv()
for n in xrange(10):
msg = "Update %d" % n
publisher.send(msg)
time.sleep(1)
publisher.send("END")
上面的建议是有效的,但没有正确解决此特定代码中的问题。
这里真正的问题是,在
durapub.py
中,你在调用 publisher.setsockopt(zmq.HWM, 2)
之后再调用 publisher.bind
。您应该在 setsockopt
或 bind
之前致电 connect
。
setsockopt请参考0MQ API文档:
注意:除 ZMQ_SUBSCRIBE、ZMQ_UNSUBSCRIBE 和 ZMQ_LINGER 之外的所有选项仅对后续套接字绑定/连接生效。
2024年更新:
对于最新版本的ZMQ(4.3.6+),您不再需要在bind/connect调用之前设置HWM。现在可以在绑定/连接后设置许多选项...请参阅文档。
但是,请注意,您可能仍然会看到 ZMQ 缓冲的消息多于 HWM 消息。原因是 HWM 只限制了内存消息数。 ZMQ 会将消息发送到内核缓冲区,这些消息显然不计入内存中消息计数(尽管您仍然可能会看到程序的内存使用量上升)。要限制内核缓冲区,您需要设置 SNDBUF/RCVBUF 选项。 Linux 上的最小缓冲区对于 SND 为 2048,对于 RCV 为 256;根据消息的大小,这可能意味着即使使用最小的内核缓冲(可能不推荐),您仍然可以获得>一次缓冲的 HWM 消息。
虽然我们不能保证缓冲消息的限制,但我建议这样:
SNDBUF = average_message_size * desired_kernel_messages
SNDHWM = desired_hwm - desired_kernel_messages