ZeroMQ XSUB / XPUB proxy()API未返回

问题描述 投票:1回答:1

我使用POCO库编写了一个演示Windows服务。

根据POCO API,函数waitForTerminationRequest()等待服务终止请求。

现在,在这个基于POCO的Windows服务中,我想启动一个基于ZeroMQ库的Message Queue代理来实现XSUB / XPUB消息队列。

更多可以在这里学到http://zguide.zeromq.org/page:all

为此我写了另一个class ZeroMQProxy,它在服务的main函数中启动代理。

class ZeroMQProxy
{
private:
    zmq::context_t context;
    zmq::socket_t xsub;
    zmq::socket_t xpub;

public:
    ZeroMQProxy()
        : context(1),
        xsub(context, ZMQ_XSUB),        // Publisher End Proxy Sockets
        xpub(context, ZMQ_XPUB)         // Subscriber End Proxy Sockets
    {
    }

    ~ZeroMQProxy()
    {
    }

    void proxyopen()
    {
        xsub.bind("tcp://*:5559");
        xpub.bind("tcp://*:5560");

        zmq::proxy(xsub, xpub, nullptr);
    }

    void proxyclose()
    {
    }
};

class demopocoservice : public ServerApplication
{
private:
    bool _helpRequested;

public:
    demopocoservice() : _helpRequested(false)
    {
    }

    ~demopocoservice()
    {
    }

protected:
    void initialize(Application& self)
    {
        loadConfiguration();
        ServerApplication::initialize(self);
    }

    void uninitialize()
    {
        ServerApplication::uninitialize();
    }

    void defineOptions(OptionSet& options)
    {
        ServerApplication::defineOptions(options);

        options.addOption(
            Option("help", "h", "poco display help")
            .required(false)
            .repeatable(false)
            .callback(OptionCallback<demopocoservice>(
                this, &demopocoservice::handleHelp)));
    }

    void handleHelp(const std::string& name,
        const std::string& value)
    {
        _helpRequested = true;
        displayHelp();
        stopOptionsProcessing();
    }

    void displayHelp()
    {
        HelpFormatter helpFormatter(options());
        helpFormatter.setCommand(commandName());
        helpFormatter.setUsage("OPTIONS");
        helpFormatter.setHeader("poco: Zero message Queue.");
        helpFormatter.format(std::cout);
    }

    int main(const ArgVec& args)
    {
        if (!_helpRequested)
        {
            ZeroMQProxy zmqproxyObj;
            zmqproxyObj.proxyopen();

            waitForTerminationRequest();

            zmqproxyObj.proxyclose();
        }
        return Application::EXIT_OK;
    }
};


int main(int argc, char** argv)
{
    cout << "Hello Poco ZMQ\n";

    demopocoservice pobj;
    pobj.run(argc, argv);

    return 0;
}

我的目标是当我启动服务然后zmq::proxy()应该启动代理,当我停止服务时,代理应该与套接字一起关闭。

问题是zmq::proxy()没有返回。所以我无法停止服务。 即使我做net stop <service name>,因为waitForTerminationRequest()zmq::proxy()没有收到终止请求。

停止服务后,如何停止/关闭代理?

zeromq poco poco-libraries
1个回答
1
投票

ZeroMQ API证实了这一点:

描述

zmq_proxy()函数在当前应用程序线程中启动内置的ØMQ代理。 ...

在调用zmq_proxy()之前,您必须设置任何套接字选项,并连接或绑定frontendbackend套接字。两种传统的代理模型是:

zmq_proxy()在当前线程中运行,仅在当前上下文关闭时返回。

鉴于这一事实,最好实例化一个独立的线程让代理在那里运行并让调用者返回,以便继续你的其他代码执行流程独立于独立的zmq_proxy()执行。

另一个公平的举措是(始终)设置LINGER == 0,然后采取新的实例化套接字进行任何进一步的步骤和/或措施。

© www.soinside.com 2019 - 2024. All rights reserved.