ZMQ_CONFLATE套接字选项不会收到任何消息

问题描述 投票:2回答:2

我试图找到为什么启用ZMQ_CONFLATE选项导致不接收消息的原因。

我重新创建了这个最小的测试用例(我的应用程序使用XPUB / XSUB代理,但是,它似乎不会改变此测试的结果):

#include <atomic>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <zmq.hpp>

#define USE_PROXY

std::atomic<bool> stop{false};

void pub_thread(zmq::context_t &context)
{
    zmq::socket_t pub(context, zmq::socket_type::pub);
#ifdef USE_PROXY
    pub.connect("tcp://localhost:38922");
#else
    pub.bind("tcp://*:38923");
#endif
    long i = 0;
    for(;;)
    {
        if(stop) break;
        std::string m = boost::lexical_cast<std::string>(i);
        zmq::message_t hdr(6);
        memcpy(hdr.data(), "topic1", 6);
        zmq::message_t msg(m.size());
        memcpy(msg.data(), m.data(), m.size());
        std::cout << "send: " << m << std::endl;
        if(!pub.send(hdr, ZMQ_SNDMORE) || !pub.send(msg))
            std::cout << "send error" << std::endl;
        i++;
        boost::this_thread::sleep_for(boost::chrono::milliseconds{20});
    }
}

void sub_thread(zmq::context_t &context)
{
    zmq::socket_t sub(context, zmq::socket_type::sub);
    const int v_true = 1;
    sub.setsockopt(ZMQ_CONFLATE, &v_true, sizeof(v_true));
#ifdef USE_PROXY
    sub.connect("tcp://localhost:38921");
#else
    sub.connect("tcp://localhost:38923");
#endif
    sub.setsockopt(ZMQ_SUBSCRIBE, "topic1", 6);
    for(;;)
    {
        if(stop) break;
        zmq::message_t hdr, msg;
        if(!sub.recv(&hdr) || !hdr.more() || !sub.recv(&msg))
            std::cout << "recv error" << std::endl;
        std::string m(reinterpret_cast<const char*>(msg.data()), msg.size());
        std::cout << "                recv: " << m << std::endl;
        boost::this_thread::sleep_for(boost::chrono::milliseconds{250});
    }
}

void proxy_thread(zmq::context_t &context)
{
#ifdef USE_PROXY
    zmq::socket_t xpub(context, zmq::socket_type::xpub);
    xpub.bind("tcp://*:38921");
    zmq::socket_t xsub(context, zmq::socket_type::xsub);
    xsub.bind("tcp://*:38922");
    std::cout << "starting xpub/xsub proxy" << std::endl;
    zmq::proxy(xpub, xsub, nullptr);
    std::cout << "xpub/xsub proxy terminated" << std::endl;
#endif
}

void timeout_thread()
{
    boost::this_thread::sleep_for(boost::chrono::seconds{4});
    stop = true;
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    exit(0);
}

int main(int argc, char **argv)
{
    zmq::context_t context(1);
    boost::thread t0(&timeout_thread);
    boost::thread t1(&proxy_thread, boost::ref(context));
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    boost::thread t2(&sub_thread, boost::ref(context));
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    boost::thread t3(&pub_thread, boost::ref(context));
    t0.join();
}

简要说明:我们有4个主题:

  • pub thread:每隔20ms将一个递增计数器的值写入PUB套接字
  • 子线程:每隔250ms从SUB套接字读取一次值(消息应该排队,但由于conflate选项,应该丢弃除最近的)
  • 代理线程:运行XPUB / XSUBN代理(如果定义了USE_PROXY)
  • 超时线程:4秒后停止一切

我观察到的输出如下:

starting xpub/xsub proxy
send: 0
send: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87

即没有收到任何消息。

预期的输出应该是这样的:

starting xpub/xsub proxy
send: 0
send: 1
                recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
                recv: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
                recv: 21
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
                recv: 33
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
                recv: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
                recv: 55
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
                recv: 66
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
                recv: 77
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87

我尝试在sub.setsockopt(ZMQ_CONFLATE,...之后移动sub.connect(...,但在这种情况下它没有效果,与删除ZMQ_CONFLATE行的效果相同:

starting xpub/xsub proxy
send: 0
send: 1
                recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
                recv: 2
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
                recv: 3
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
                recv: 4
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
                recv: 5
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
                recv: 6
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
                recv: 7
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
                recv: 8
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87

ZMQ版本:4.2.5

c++ zeromq
2个回答
3
投票

您正在使用与ZMQ_CONFLATE不兼容的多部分消息

ZMQ_CONFLATE:仅保留最后一条消息如果设置,套接字应在其入站/出站队列中只保留一条消息,此消息是最后一条消息/最后一条消息要发送。忽略ZMQ_RECVHWM和ZMQ_SENDHWM选项。不支持多部分消息,特别是,它只保留在套接字内部队列中的一部分。

如果您更改代码以发送一个消息部分(整数)并订阅所有sub.setsockopt(ZMQ_SUBSCRIBE, "", 0);,则代码会生成您期望的结果。


0
投票

在订阅套接字中,尝试在连接之前设置ZMQ_CONFLATE

来自文档:

http://api.zeromq.org/4-2:zmq-setsockopt

注意:所有选项,与ZMQ_SUBSCRIBE,ZMQ_UNSUBSCRIBE,ZMQ_LINGER,ZMQ_ROUTER_HANDOVER,ZMQ_ROUTER_MANDATORY,ZMQ_PROBE_ROUTER,ZMQ_XPUB_VERBOSE,ZMQ_XPUB_VERBOSER,ZMQ_REQ_CORRELATE,ZMQ_REQ_RELAXED,ZMQ_SNDHWM和ZMQ_RCVHWM外,只需要为后续套接字绑定效果/连接。

很好的示例代码,很高兴看到您的预期结果。

© www.soinside.com 2019 - 2024. All rights reserved.