我有一个使用ZeroMQ进行消息传递的C ++应用程序。但它还必须为基于AJAX / Comet的Web服务提供SGCI连接。
为此,我需要一个普通的TCP套接字。我可以通过普通的Posix套接字做到这一点,但是要保持跨平台的便携性并让我的生活更轻松(我希望...)我正在考虑使用Boost :: ASIO。
但现在我有ZMQ的冲突想要使用它自己的zmq_poll()
和ASIO它是io_service.run()
...
有没有办法让ASIO与0MQ zmq_poll()
一起工作?
或者是否有其他推荐的方法来实现这样的设置?
注意:我可以通过使用多个线程来解决这个问题 - 但它只是一个单核心/ CPU盒子,它将以非常低的SCGI流量运行该程序,因此多线程将浪费资源......
ZMQ_FD:检索与套接字关联的文件描述符ZMQ_FD选项应检索与指定套接字关联的文件描述符。返回的文件描述符可用于将套接字集成到现有的事件循环中; ØMQ库应通过使文件描述符准备好进行读取,以边沿触发的方式发送套接字上的任何挂起事件。
我认为你可以使用null_buffers
为每个zmq_pollitem_t
并将事件循环推迟到io_service
,完全绕过zmq_poll()
。然而,在上述文件中似乎有一些警告,特别是
从返回的文件描述符中读取的能力并不一定表明消息可以从底层套接字读取或写入;应用程序必须检索实际的事件状态,然后检索ZMQ_EVENTS选项。
因此,当你的某个zmq套接字的处理程序被触发时,你必须在处理我认为的事件之前做更多的工作。未编译的伪代码如下
const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
boost::asio::null_buffers(),
[=](const boost::system::error_code& error)
{
if (!error) {
// handle data ready to be read
}
}
);
注意你不必在这里使用lambda,boost::bind
到成员函数就足够了。
最后,我发现有两种可能的解决方案:
.native()
数组中我接受了Sam Miller的答案,因为对我来说这是SCGI案例中最好的解决方案,在这种情况下不断创建和结束新的连接。通过使用ASIO事件循环可以避免处理因此每个更改的acceptor
数组都很麻烦。
获得ZeroMQ的套接字是战斗中最小的部分。 ZeroMQ基于通过TCP分层的socket
,因此如果你走这条路线,你必须在自定义的Boost.Asio io_service中重新实现ZeroMQ。我在使用Boost.Asio创建异步zmq_pollitem_t
服务时遇到了同样的问题,首先只是尝试使用Boost.Asio UDP服务捕获来自ENet客户端的流量。 ENet是一种类似于TCP的TCP协议,所以我在那时实现的只是捕获几乎无用状态的数据包。
Boost.Asio是基于模板的,内置的io_service使用模板基本上包装系统套接字库以创建TCP和UDP服务。我的最终解决方案是创建一个包含ENet库而不是系统套接字库的自定义io_service,允许它使用ENet的传输函数,而不必使用内置的UDP传输重新实现它们。
对于ZeroMQ也可以这样做,但ZeroMQ已经是一个非常高性能的网络库,它已经提供了异步I / O.我认为您可以通过使用ZeroMQ的现有API接收消息并将消息传递到io_service线程池来创建可行的解决方案。这样,仍然可以使用Boost.Asio的reactor模式异步处理消息/任务,而无需重新编写任何内容。 ZeroMQ将提供异步I / O,Boost.Asio将提供异步任务处理程序/工作程序。
现有的io_service仍然可以耦合到现有的TCP套接字,允许线程池同时处理TCP(在您的情况下为HTTP)和ZeroMQ。在ZeroMQ任务处理程序的这种设置中,完全有可能访问TCP服务会话对象,允许您将ZeroMQ消息/任务的结果发送回TCP客户端。
以下只是为了说明这个概念。
zmq_pollitem_t
在这个问题发布2年后,有人发布了一个确实如此的项目。该项目在这里:// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
while (1) {
char buffer [10];
zmq_recv (responder_, buffer, 10, 0);
io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
。讨论设计的博客文章在这里:this solution。
以下是从自述文件复制的示例代码:
poll()
看起来不错。如果你试试,请在评论中告诉我它是否仍然有效2019年[我可能会在几个月内尝试然后更新这个答案](回购陈旧,最后一次提交是一年前)