我的消息总线仅将消息从其 PULL 套接字转发到 PUB 套接字。它作为系统其他部分的通信媒介,使用 PUSH 套接字发送消息并使用 SUB 套接字接收消息。
总线上的大部分通信发生在守护进程之间。但也可以有程序连接到总线,立即发送问题,等待某人回复并断开连接。
问题是使用 SUB 套接字订阅不是阻塞操作。在有人回答问题之前,订阅可能尚未在消息总线 PUB 套接字中注册。
这里是一个示例,其中 20 个
ephemeral_program
连接到总线,发送消息并期望从总线接收回相同的消息。
有些人收到消息,但有些人没有。一种不可靠的解决方案是在发送消息之前休眠一小段时间。更好的方法是通过向总线发送消息来循环进行订阅检查,直到收到其中一条消息。我真的不喜欢这个,因为它基本上是轮询操作,而不仅仅是发布者准备好时的一次确认。
但是这是唯一的方法还是有一些套接字选项或另一个一对多套接字对用于订阅者/侦听器端以确保发布者端已注册订阅?
import time
from multiprocessing import Process
from zmq import Context, Poller, Socket
from zmq.constants import POLLIN, PUB, PULL, PUSH, SUB
def message_bus():
context = Context.instance()
in_socket: Socket = context.socket(PULL)
in_socket.bind("ipc:///tmp/in-socket.ipc")
out_socket: Socket = context.socket(PUB)
out_socket.bind("ipc:///tmp/out-socket.ipc")
while True:
msg = in_socket.recv_string()
out_socket.send_string(msg)
# Daemon to create some traffic
def daemon():
context = Context.instance()
receive_socket: Socket = context.socket(SUB)
receive_socket.connect("ipc:///tmp/out-socket.ipc")
receive_socket.subscribe("")
send_socket: Socket = context.socket(PUSH)
send_socket.connect("ipc:///tmp/in-socket.ipc")
poller = Poller()
poller.register(receive_socket, POLLIN)
while True:
polling_result = poller.poll(timeout=1)
if polling_result:
receive_socket.recv_string()
else:
send_socket.send_string("Hi!")
def ephemeral_program(i):
context = Context.instance()
send_socket: Socket = context.socket(PUSH)
send_socket.connect("ipc:///tmp/in-socket.ipc")
receive_socket: Socket = context.socket(SUB)
receive_socket.connect("ipc:///tmp/out-socket.ipc")
receive_socket.subscribe(f"ephemeral_program_{i}")
poller = Poller()
poller.register(receive_socket, POLLIN)
# time.sleep(1)
# This would in most cases give enough time to register the
# subscription on the PUB side.
# Better option is to confirm subscription using loop
# while True:
# send_socket.send_string(f"ephemeral_program_{i}:am I subscribed?")
# polling_result = poller.poll(timeout=1)
#
# if polling_result:
# msg = receive_socket.recv_string()
# if msg == f"ephemeral_program_{i}:am I subscribed?":
# print("Subscription confirmed")
# break
# else:
# print("Not yet subscribed")
# Could be question to someone else
send_socket.send_string(f"ephemeral_program_{i}:Hello")
polling_result = poller.poll(timeout=100)
if polling_result:
print(receive_socket.recv_string())
else:
print("Did not receive message")
message_bus_p = Process(target=message_bus, args=())
message_bus_p.start()
n = 20
daemons_p = [Process(target=daemon, args=()) for _ in range(n)]
for daemon_p in daemons_p:
daemon_p.start()
# Allow the message bus and daemons to ramp up as if they've
# been running for some time
time.sleep(0.2)
ephemeral_programs_p = [
Process(target=ephemeral_program, args=(i,)) for i in range(n)
]
for ephemeral_program_p in ephemeral_programs_p:
ephemeral_program_p.start()
for ephemeral_program_p in ephemeral_programs_p:
ephemeral_program_p.join()
for daemon_p in daemons_p:
daemon_p.kill()
message_bus_p.kill()
使用 XPUB 和 XPUB_VERBOSE 套接字选项,您可以读取到达的订阅并发送确认消息。然后只需等待
ephemeral_program
中的确认即可。
from zmq import Context, Poller, Socket
from zmq.constants import POLLIN, PULL, PUSH, SUB, XPUB, XPUB_VERBOSE
def message_bus():
context = Context.instance()
in_socket: Socket = context.socket(PULL)
in_socket.bind("ipc:///tmp/in-socket.ipc")
out_socket: Socket = context.socket(XPUB)
out_socket.bind("ipc:///tmp/out-socket.ipc")
out_socket.setsockopt(XPUB_VERBOSE, True)
poller = Poller()
poller.register(in_socket, POLLIN)
poller.register(out_socket, POLLIN)
while True:
socks = dict(poller.poll())
if in_socket in socks:
msg = in_socket.recv_string()
out_socket.send_string(msg)
if out_socket in socks:
msg = out_socket.recv_multipart()
first_part = msg[0]
subscription_prefix = b'\x01'
if first_part.startswith(subscription_prefix):
topic_bytes = first_part[len(subscription_prefix) :]
topic = str(topic_bytes, encoding="UTF8")
out_socket.send_string(f"{topic}:subscription_ack")
def ephemeral_program(i):
context = Context.instance()
send_socket: Socket = context.socket(PUSH)
send_socket.connect("ipc:///tmp/in-socket.ipc")
receive_socket: Socket = context.socket(SUB)
receive_socket.connect("ipc:///tmp/out-socket.ipc")
poller = Poller()
poller.register(receive_socket, POLLIN)
receive_socket.subscribe(f"ephemeral_program_{i}")
# Wait for subscription ack or any message with subscribed topic
polling_result = poller.poll(timeout=10000)
if polling_result:
msg = receive_socket.recv_string()
print(f"Subscription confirmed {msg}")
else:
print("Couldn't subscribe")
return
# Could be question to someone else
send_socket.send_string(f"ephemeral_program_{i}:Hello")
polling_result = poller.poll(timeout=100)
if polling_result:
print(receive_socket.recv_string())
else:
print("Did not receive message")