Python ZeroMQ PUSH / PULL逻辑,在不丢失任何消息的情况下将高水位线设置到低端拉线器

问题描述 投票:1回答:2

我正在使用简单的一对一PUSH/PULL工作者/服务器python代码发送和接收消息。

工作人员使用PUSH套接字将消息发送到PULL服务器。服务器处理单元不如工作单元强大,因此,在发送太多消息时,服务器的RAM开始增长,直到系统杀死所有东西为止。

我尝试如下设置接收机的高水位标记:

worker_sock_in = ZMQ_CTXT.socket(zmq.PULL)
worker_sock_in.setsockopt(zmq.LINGER, 1000))
worker_sock_in.setsockopt(zmq.RCVTIMEO, 1000)) # detects if the link is broken
worker_sock_in.setsockopt(zmq.RCVHWM, 1000)
worker_sock_in_port = worker_sock_in.bind_to_random_port(listen_addr, port_start, port_end)

下面的代码用于工作人员创建和发送消息:

sock_dest = ZMQ_CTXT.socket(zmq.PUSH)
sock_dest.setsockopt(zmq.LINGER, 1000))
sock_dest.setsockopt(zmq.SNDTIMEO, 1000)) # detects if the link is broken
sock_dest.setsockopt(zmq.SNDHWM, 0) # never block on sending msg
sock_dest.connect(sock_dest_address)
# sends a msg
self.sock_dest.send(msg, zmq.NOBLOCK)

而且似乎可以解决问题,但我猜是服务器只是丢弃了溢出消息,在我的情况下这是不可接受的。

我已经使用此thread开发了我的软件,但是我不确定要了解答案的其他信息部分。

所以问题是,在noblock push / pull zeromq套接字上达到的HWM的实际行为是什么,有没有一种方法来确保Push Pull基础结构能够确保所有已发送的消息将被Pull套接字接收而不会增加其内存或阻塞发件人?

python zeromq distributed-system pyzmq low-latency
2个回答
1
投票

我建议您在中间(发送方和接收方之间)添加一个代理,它将在给定的时间内保存已发送的消息。您必须制定代码逻辑来保存消息,并在服务器未收到特定消息时得到通知。 0mq无法提供保存或恢复丢失的消息的方法。


1
投票

Q是否有[[一种方法具有推挽基础结构保证所有已发送的消息将由拉套接字而不会增加其内存或< [阻止发件人?

一种方法?是的,有:内置

ZeroWarranty

(涵盖以原件的1:1位副本传递或完全不传递的消息)将需要扩展-通过应用程序级别协议(涵盖那些重新发送的消息)未交付,直到确认)或将基础架构移入使用,但使用特定的有保证的交付协议,这将有助于满足这一超出标准的要求-使用

norm://

传输类扩展名并在其中移动示例情况PUSH/PULL仍未进入PUB/SUB, XPUB/XSUB可扩展形式通信模式原型的RTO状态。

libzmq中提供了新的传输选项。 norm_engine.hpp”和norm_engine.cpp文件为ZeroMQ提供了面向NACK的可靠多播(NORM)传输协议选项的实现。 NORM是RFC 5740和支持文档中指定的IETF开放标准协议。海军研究实验室(NRL)提供了位于http://www.nrl.navy.mil/itd/ncs/products/norm的开源参考实现。 NORM支持通过IP多播进行可靠的数据传输,但也支持单播(点对点)数据传输。 NORM在用户数据报协议(UDP)之上运行,并通过基于NACK的自动重复请求(ARQ)支持可靠性,该请求使用分组擦除编码进行非常有效的组通信。 NORM还为支持端到端流控制

提供了TCP友好的自动拥塞控制和机制。 NRL NORM实现也可以配置为提供基本的类UDP尽力而为传输服务(无接收器反馈),并且可以通过向传输中添加一些应用程序可设置的主动前向纠错(FEC)数据包来增强此功能。也就是说,默认情况下,NORM仅发送“反应性” FEC修复数据包响应NACK,但也可以配置为在可靠性水平上主动发送添加的修复数据包,而无需接收方的任何反馈。
除了其TCP-友好的拥塞控制功能,NORM还可以配置为固定速率运行,并且NRL实现支持一些其他的适用于易于误码的无线通信环境的自动拥塞控制选项。尽管其可靠的ARQ操作主要基于NACK(检测到数据包丢失时为否定确认),但它还支持来自接收机的可选肯定确认(ACK),可用于传递确认)和显式流控制。] >Inflating memory要求有两种解决方法:一种-用于.send() -er的

显式控件,而不是向.send() -er侧Context()-实例的资源(RAM ),即在资源受限的限制内(主要是防止任何泛洪/丢弃消息的发生),另一个-具有足够的RAM和正确配置的Context()实例,以允许所有数据流过。


Q

在无块推/拉zeromq套接字上,达到HWM]的实际行为是什么?

首先,让我们揭开神秘面纱。 ZMQ_NOBLOCK指令指向本地.send()Context(),以立即将对.send()方法的调用返回给调用者,即不阻止调用代码执行(消息有效载荷用于无论其内部状态如何,都在本地ZeroMQ Context()-实例内部进行进一步处理-经典的非阻塞代码设计。]

[ZMQ_SNDHWM

相反地指示Context()实例,如何设置此套接字的阈值,并且对于所述PUSH/PULL.send()-er情况:

高水位标记是未完成消息的最大数量的硬性限制ØMQ必须在内存中为指定套接字正在与之通信的任何单个对等方排队。零值表示没有限制。如果已达到此限制,则套接字应进入异常状态,并取决于套接字类型,ØMQ应采取适当的措施,例如阻止或丢弃已发送的消息。有关每种插座类型所采取的确切操作的详细信息,请参见zmq_socket(3)中的各个插座说明。ØMQ不保证套接字接受多达ZMQ_SNDHWM条消息,实际限制可能会降低多达60-70%,具体取决于套接字上的消息流。

也使用ZMQ_XPUB_NODROP可能有助于

norm://

-运输类用例。还请注意,默认情况下,ZMQ_PUSH-套接字的API会确认:

ZMQ_PUSH套接字由于已到达所有下游节点的高水位线而进入静音状态时,或者如果根本没有下游节点,则套接字上的任何zmq_send(3)操作

应阻止

直到静音状态结束或至少一个下游节点可用于发送; 消息不会被丢弃。对于表现欠佳的可疑对象(PULL侧,也请使用
.getsockopt( ZMQ_RCVBUF )
-方法,并在尺寸上调整适当且足够大的[C0 ],根据需要:

.setsockopt( ZMQ_RCVBUF )选项应将套接字的底层内核接收缓冲区大小设置为指定的字节大小。值-1表示保持操作系统默认值不变。有关详细信息,请参阅操作系统文档中的ZMQ_RCVBUF套接字选项。

如果以上方法无济于事,则可以使用


SO_RCVBUF服务将自我诊断元平面注入ZeroMQ基础架构中,并获得对情况的完全控制,这些情况通常在应用程序看不见的情况下发生代码(根据需要反映内部API状态和转换)。

决定由您决定。

© www.soinside.com 2019 - 2024. All rights reserved.