ZeroMQ:除了轮询或睡眠之外,是否没有其他方法来检查套接字是否已连接?

问题描述 投票:0回答:1

我们正在使用ZeroMQ进行一个项目,其架构如下:

这基本上是我们用 C 语言编写的使用 ZeroMQ 的模块:

#define RET_ERROR(x) syslog(LOG_ERR, x "%s", zmq_strerror(errno)); cleanupZMQ(); return false;

static zmq_ctx_t ctx = NULL;
static zmq_sock_t pub_sock = NULL;
static zmq_sock_t sub_sock = NULL;
static zmq_sock_t sub_mon_sock = NULL;
static zmq_sock_t pub_mon_sock = NULL;

static char* subscriptions[] = { REQ_TOPIC };
static size_t subscriptions_count = (sizeof(subscriptions) / sizeof(subscriptions[0]));

static inline void cleanupZMQ() {
    if (pub_sock) {
        zmq_close(pub_sock);
    }

    if (sub_sock) {
        zmq_close(sub_sock);
    }

    if (sub_mon_sock) {
        zmq_close(sub_mon_sock);
    }

    if (pub_mon_sock) {
        zmq_close(pub_mon_sock);
    }

    if (ctx) {
       zmq_ctx_destroy(ctx);
    }
}

static bool waitForConnect(zmq_sock_t monitor) {
    zmq_msg_t msg;

    bool ret = true;
    bool connected = false;
    bool handshaked = false;
    do {
        zmq_msg_init(&msg);
        int rc = zmq_msg_recv(&msg, monitor, 0);
        if (rc < 0) {
            RET_ERROR("Error! Can not receive first frame from monitor: ")
        }
        uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
        uint16_t event = *((uint16_t*)data);
        uint32_t value = *((uint32_t*)data+2);

        zmq_msg_init(&msg);
        rc = zmq_msg_recv(&msg, monitor, 0);
        if (rc < 0) {
            RET_ERROR("Error! Can not receive second frame from monitor: ")
        }
        char* addr = (char*)zmq_msg_data(&msg);
        syslog(LOG_DEBUG, "Event: %u, Value: %u, Addr: %s", event, value, addr);
        if (event == ZMQ_EVENT_CONNECTED) {
            syslog(LOG_INFO, "Connected to '%s'.", addr);
            connected = true;
        } else if (event == ZMQ_EVENT_CONNECT_DELAYED) {
            syslog(LOG_NOTICE, "Connecting delayed!");
        } else if (event == ZMQ_EVENT_CONNECT_RETRIED) {
            syslog(LOG_NOTICE, "Connecting retried!");
        } else if ((event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL)
            || (event == ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL)
            || (event == ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)) {
            syslog(LOG_ERR, "Error! Handshake with '%s' failed: %s", addr, zmq_strerror(value));
            handshaked = true;
            ret = false;
        } else if (event == ZMQ_EVENT_HANDSHAKE_SUCCEEDED) {
            syslog(LOG_INFO, "Handshake with '%s' succeded.", addr);
            handshaked = true;
            ret = true;
        } else {
            syslog(LOG_NOTICE, "Unexpected event: %u", event);
        }
    } while(!(connected && handshaked));

    zmq_msg_close(&msg);
    return ret;
}

bool startZMQ() {
    if (ctx == NULL) {
        ctx = zmq_ctx_new();
        if (ctx == NULL) {
            RET_ERROR("Error! Can not open ZMQ context: ");
        }
    } else {
        syslog(LOG_INFO, "ZMQ is already started.");
    }

    if (sub_sock == NULL) {
        sub_sock = zmq_socket(ctx, ZMQ_SUB);
        if (sub_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ sub socket: ");
        }

        if (zmq_socket_monitor(sub_sock, SUB_MON_ADDR, ZMQ_EVENT_ALL)) {
            RET_ERROR("Error! Can not monitor ZMQ sub socket: ");
        }

        sub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
        if (sub_mon_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ sub-monitor socket: ");
        }

        if (zmq_connect(sub_mon_sock, SUB_MON_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ sub-monitor socket: ");
        }

        for (size_t i=0; i<subscriptions_count; i++) {
            if (zmq_setsockopt(sub_sock, ZMQ_SUBSCRIBE, subscriptions[i], strlen(subscriptions[i]))) {
                syslog(LOG_ERR, "Error! Can not subscribe to topic '%s': %s", subscriptions[i], zmq_strerror(errno));
            } else {
                syslog(LOG_INFO, "Subscribed to '%s'.", subscriptions[i]);
            }
        }

        if (zmq_connect(sub_sock, SUB_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ sub socket: ");
        }
        waitForConnect(sub_mon_sock);
    } else {
        syslog(LOG_INFO, "Subscriber socket is already open.");
    }

    if (pub_sock == NULL) {
        pub_sock = zmq_socket(ctx, ZMQ_PUB);
        if (pub_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ pub socket: ");
        }

        if (zmq_socket_monitor(pub_sock, PUB_MON_ADDR, ZMQ_EVENT_ALL)) {
            RET_ERROR("Error! Can not monitor ZMQ pub socket: ");
        }

        pub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
        if (pub_mon_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ pub-monitor socket: ");
        }

        if (zmq_connect(pub_mon_sock, PUB_MON_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ pub-monitor socket: ");
        }

        if (zmq_connect(pub_sock, PUB_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ pub socket: ");
        }

        waitForConnect(pub_mon_sock);
    } else {
        syslog(LOG_INFO, "Publisher socket is already open.");
    }

    sleep(3);
    return true;
}

size_t sendZMQMsg(const char* topic, size_t topic_len, const msg_buffer_t msg, size_t msg_len) {
    size_t sended = 0;
    int rc = zmq_send(pub_sock, topic, topic_len, ZMQ_SNDMORE);
    if (rc < 0) {
        syslog(LOG_ERR, "Error! Could not send ZMQ topic: %s", zmq_strerror(errno));
        return 0;
    }
    sended += rc;
    rc = zmq_send(pub_sock, msg, msg_len, 0);
    if (rc < 0) {
        syslog(LOG_ERR, "Error! Could not send ZMQ message: %s", zmq_strerror(errno));
        return 0;
    }
    sended += rc;
    return sended;
}

void endZMQ() {
    cleanupZMQ();
}

如您所见,我们必须在

sleep(3)
函数的末尾添加一个
startZMQ()
。如果没有这个,前几条发送的消息将会丢失。

我们当然知道这种“慢加入综合症”。我们确保代理在连接任何内容之前已准备好,并且订阅者在发布者之前连接(也有三秒延迟)。但是,我们仍然必须等待这三秒钟,发布者才能使用他们的套接字。 由于中央代理,发布者和订阅者彼此不认识,我们不希望他们必须直接相互连接,因为我们有很多这两个部分,如果有人必须直接连接到其他人,系统基本上无法维护。

我们发现了这个问题和这个一个,当然我们阅读了指南,特别是这个部分,在指南本身中使用了

sleep(1)
,并且使用第二个套接字对进行轮询是受到推崇的。 除了轮询之外,这个库中真的没有其他方法来检查您的套接字是否已准备好吗?

如您所见,我们已经使用 zmq-monitor-sockets 在

waitForConnect
函数中捕获了 zmq 事件。这还不够吗?我们在这里遗漏了什么吗?

c zeromq
1个回答
0
投票

从根本上讲,您受到 tcp(支撑 ZMQ)缓冲这一事实的限制。 ZMQ 做了很多确保发送的消息被传递(心跳、整个消息传递等),只要连接处于活动状态,但这并不能解决所有问题。

tcp和ZMQ都实现了“Actor模型”;发送的数据由传输缓冲。在接收者调用recv()之前,发送者可以从send()返回。如果您想要绝对保证送达(或确认未送达),您需要将某种确认消息传回发件人。

您最终得到的是通信顺序流程。这是 Actor 模型,但发送者和接收者之间的连接不会缓冲正在传输的数据。在接收者的recv() 完成之前,send() 不会完成。这充当执行集合点(发送行为意味着您的代码知道其执行过程中的位置 - 即 recv() 已完成)。如果您有(准)实时要求,这尤其有用;您的发件人可以知道收件人是否跟不上!

CSP 在 Go 和 Rust 中实现 - 它有点重新流行起来。

© www.soinside.com 2019 - 2024. All rights reserved.