zeromq: ZMQ_CONFLATE==1并不能阻止队列保存旧的消息。

问题描述 投票:0回答:1

在ZeroMQ和CPPZMQ 4.3.2的情况下,我想删除所有套接字的旧消息,包括

  • PAIR
  • PubSub
  • REQREP

所以我用 m_socks[channel].setsockopt(ZMQ_CONFLATE, 1) 在绑定连接之前,在我所有的套接字上进行测试。

测试

但是,当我做了下面的测试后,似乎每次重新连接时,旧的消息还是会被冲掉。在这个测试中。

  • 我使用一个线程不断地将生成的正弦波发送到一个接收线程中
  • 每10秒我就把正弦波的频率提高一倍
  • 10秒后,我停止了这个过程

以下是发件人的伪代码。

// on sender end

auto thenSec = high_resolution_clock::now();
while(m_isRunning) {

    // generate sinewave, double the frequency every 10s or so

    auto nowSec = high_resolution_clock::now();
    if (duration_cast<seconds>(nowSec - thenSec).count() > 10) {
        m_sine.SetFreq(m_sine.GetFreq()*2);
        thenSec = nowSec;
    }
    m_sine.Generate(audio);

    // send to rendering thread

    m_messenger.send("inproc://sound-ear.pair", 
        (const void*)(audio), 
        audio_size, 
        zmq::send_flags::dontwait
    );

}

注意,我已经用DONTWAIT来减轻阻挡。

在接收端,我有一个 zmq::poller_event 处理程序,简单地接收事件轮询的最后一条消息。

在停止序列中,我将正弦波频率重置为最低值,比如440Hz。

预期的

预期的行为将是。

  • 如果我在10秒后停止发送器和接收器的工作 当频率翻倍时
  • 我重新启动两个。
  • 那么我应该看到正弦波复位到440Hz。

观察到的

但观察到的行为是,重新开始通信后,接收到的正弦波仍然是双倍的频率,即880Hz。

疑问

我是否做错了,或者我应该使用某种killswitch来强制丢弃所有消息?

caching zeromq latency
1个回答
0
投票

好吧,我想我自己解决了。算是解决了吧。

实际解决方案

我终于意识到,我想要的行为是在停止渲染时刷新所有消息。根据 公文本(如何刷新ZeroMQ套接字队列中的所有消息?),这只能通过

  • 设置发送方和接收方的套接字。ZMQ_LINGER 选项为0,意思是在关闭这些套接字时不做任何事情。
  • 关闭发送端和接收端的套接字,这也涉及到引导轮询器和所有对套接字的引用。

如果我要重新开始渲染我的数据,就在停止序列之后,这似乎是很多不必要的工作。但我没有找到其他方法来干净利落地解决这个问题。

最初的努力

在我看来 ZMQ_CONFLATE 无所谓 PAIR. 我真的要求助于... ZMQ_SNDHWMZMQ_RCVHWM 来调整发送端和接收端的高水印。

但是,我说 "有点 "的原因有几个,就是在最后调整HWM并不是实时应用的最佳方案。

  • 将ZMQ_SNDHWM ZMQ_RCVHWM设置为最小值 "1",对于一个实时应用来说,我们仍然有很大的延迟。

  • 另外,如果消费者线程请求更繁忙的流量,我在最低的HWM下会有可感知的抖动。

如果我没有做错什么,我想对于我的目标场景来说,最优的解决方案还是共享内存。这是很悲哀的,因为我真的很喜欢ZMQ的多播消息模式的简单性,讨厌到处都是线程锁。

© www.soinside.com 2019 - 2024. All rights reserved.