我想开发一种可靠的多客户端、单服务器请求/代表通信模式。我决定使用 ZMQ_REQ 和 ZMQ_ROUTER 套接字来完成此任务。
ZeroMQ RFC 声明如下:
ROUTER 套接字类型应在对等方创建双队列时 连接到它。如果该对等方断开连接,则 ROUTER 套接字应 销毁其双队列并应丢弃其包含的任何消息。
根据此语义,我希望消息 ROUTER 丢弃断开连接的客户端。我开发了以下代码来测试我的设计。
// Client Code
// cppzmq: v4.10.0
// libzmq: v4.3.5
#include <zmq.hpp>
#include <iostream>
zmq::context_t context{1};
bool reqrep(zmq::message_t &req, std::string addr)
{
zmq::socket_t client{context, zmq::socket_type::req};
client.set(zmq::sockopt::rcvtimeo, 2500);
client.set(zmq::sockopt::sndtimeo, 2500);
client.set(zmq::sockopt::immediate, true);
client.set(zmq::sockopt::linger, 0);
client.connect(addr);
if (!client.send(req))
{
std::cerr << "E: send timeout\n";
return false;
}
zmq::message_t rep;
if (client.recv(rep))
{
std::cout << "I: server replied OK (" << rep.to_string() << ")\n";
return true;
}
else
{
std::cerr << "E: receive timeout\n";
return false;
}
}
int main()
{
std::string addr{"ipc:///tmp/server"};
int seq{};
while (true)
{
zmq::message_t req{std::to_string(seq++)};
reqrep(req, addr);
}
return 0;
}
// Server Code
// cppzmq: v4.10.0
// libzmq: v4.3.5
#include <unistd.h>
#include <zmq_addon.hpp>
#include <iostream>
// Provide random number from 0..(num-1)
#define within(num) (int)((float)((num) * random()) / (RAND_MAX + 1.0))
int main()
{
zmq::context_t context(1);
zmq::socket_t server(context, ZMQ_ROUTER);
server.bind("ipc:///tmp/server");
while (1)
{
zmq::multipart_t mp;
auto req = zmq::recv_multipart(server, std::back_inserter(mp));
std::cout << "I: message received";
std::cout << mp.str() << std::endl;
if (!within(20))
{
std::cout << "I: simulating CPU overload" << std::endl;
sleep(20);
}
sleep(1); // Do work
zmq::send_multipart(server, mp);
}
return 0;
}
结果不符合我的期望(左:服务器,右:客户端)。休眠 20 秒后,服务器仍然保留来自断开连接的对等方的消息并处理它们。
有什么我想念的吗?我会寻找反馈。
连接在zeromq上下文线程中异步管理,因此除非您在客户端上调用
disconnect
或在服务器上调用unbind
(或者存在真正的网络问题),否则不会发生断开连接。
您可以使用
socket_monitor
进一步调查
https://libzmq.readthedocs.io/en/latest/zmq_socket_monitor.html