未收到ZMQ消息

问题描述 投票:1回答:1

请原谅我,如果我遗漏了一些简单的东西,这是我第一次使用消息传递做任何事情,我从其他人那里继承了这个代码库。

我试图从IP 10.10.10.200的Windows机器发送消息到IP为10.10.10.15的Ubuntu机器。

从Windows机器运行TCPView时,我得到以下结果,这让我怀疑问题在于Ubuntu机器。如果我正确读取,那么我在Windows机器上的应用程序已在端口5556上创建了一个连接,这是它应该做的。如果我错了,我也会包含windows代码。

my_app.exe  5436    TCP MY_COMPUTER 5556    MY_COMPUTER 0   LISTENING                                       

Windows应用代码:

    void 
    NetworkManager::initializePublisher()
    {
        globalContext = zmq_ctx_new();

        globalPublisher = zmq_socket(globalContext, ZMQ_PUB);

        string protocol = "tcp://*:";
        string portNumber = PUBLISHING_PORT; //5556
        string address = protocol + portNumber;
        char *address_ptr = new char[address.size() + 1];
        strncpy_s(address_ptr, address.size() + 1, address.c_str(), address.size());

        int bind_res = zmq_bind(globalPublisher, address_ptr);
        if (bind_res != 0)
        {
            cerr << "FATAL: couldn't bind to port[" << portNumber << "] and protocol [" << protocol << "]" << endl;

        }
        cout << " Connection: " << address << endl;
    }

void 
NetworkManager::publishMessage(MESSAGE msgToSend)
{

    // Get the size of the message to be sent
    int sizeOfMessageToSend = MSG_MAX_SIZE;//sizeof(msgToSend);

    // Copy IDVS message to buffer
    char buffToSend[MSG_MAX_SIZE] = "";

    // Pack the message id
    size_t indexOfId = MSG_ID_SIZE + 1;
    size_t indexOfName = MSG_NAME_SIZE + 1;
    size_t indexOfdata = MSG_DATABUFFER_SIZE + 1;

    memcpy(buffToSend, msgToSend.get_msg_id(), indexOfId - 1);

    // Pack the message name
    memcpy(buffToSend + indexOfId, msgToSend.get_msg_name(), indexOfName - 1);

    // Pack the data buffer
    memcpy(buffToSend + indexOfId + indexOfName, msgToSend.get_msg_data(), indexOfdata - 1);

// Send message
    int sizeOfSentMessage = zmq_send(globalPublisher, buffToSend, MSG_MAX_SIZE, ZMQ_DONTWAIT);

    getSubscriptionConnectionError();

    // If message size doesn't match, we have an issue, otherwise, we are good
    if (sizeOfSentMessage != sizeOfMessageToSend)
    {
        int errorCode = zmq_errno();
        cerr << "FATAL: couldn't not send message." << endl;
        cerr << "ERROR: " << errorCode << endl;
    }
}

如果您认为需要,我可以包含更多此方的代码,但是Ubuntu方面会出现错误,所以我将重点放在那里。

问题是,当我调用zmq_recv它返回-1时,当我检查zmq_errno时,我得到EAGAIN(请求非阻塞模式,此刻没有消息可用。)我还检查了netstat,我没有在端口上看到任何东西5556

首先是连接到发布者的函数,然后是获取数据的函数,然后是main。 Ubuntu侧码:

void
*connectoToPublisher()
{
    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);

    string protocol = "tcp://";
    string ipAddress = PUB_IP;      //10.10.10.15
    string portNumber = PUB_PORT;  // 5556
    string address = protocol + ipAddress + ":" + portNumber;
    cout << "Address: " << address << endl;

    char *address_ptr = new char[address.size() + 1];
    strcpy(address_ptr, address.c_str());

    // ------ Connect to Publisher ------
    bool isConnectionEstablished = false;
    int connectionStatus;
    while (isConnectionEstablished == false)
    {
        connectionStatus = zmq_connect(subscriber, address_ptr);

        switch (connectionStatus)
        {
        case 0: //we are good.
            cout << "Connection Established!" << endl;
            isConnectionEstablished = true;
            break;
        case -1:
            isConnectionEstablished = false;
            cout << "Connection Failed!" << endl;
            getSubscriptionConnectionError();
            cout << "Trying again in 5 seconds..." << endl;
            break;
        default:
            cout << "Hit default connecting to publisher!" << endl;
            break;
        }

        if (isConnectionEstablished == true)
        {
            break;
        }
        sleep(5); // Try again
    }

    // by the time we get here we should have connected to the pub
    return subscriber;
}

static void *
getData(void *subscriber)
{
    const char *filter = ""; // Get all messages
    int subFilterResult = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
    // ------ Get in main loop ------
    while (1)
    {
        //get messages from publisher
        char bufferReceived[MSG_MAX_SIZE] = "";
        size_t expected_messageSize = sizeof(bufferReceived);
        int actual_messageSize = zmq_recv(subscriber, bufferReceived, MSG_MAX_SIZE, ZMQ_DONTWAIT);

        if (expected_messageSize == actual_messageSize)
        {
            MESSAGE msg = getMessage(bufferReceived); //Uses memcpy to copy id, name, and data strutct data from buffer into struct of MESSAGE
        if (strcmp(msg.get_msg_id(), "IDXY_00000") == 0)
        {
            DATA = getData(msg); //Uses memcpy to copy data from buffer into struct of DATA
        }
    } else

        {
            // Something went wrong
            getReceivedError(); //This just calls zmq_errno and cout the error
        }
        usleep(1);
    }
    }

int main (int argc, char*argv[])
{
//Doing some stuff...

void *subscriber_socket = connectoToHeadTrackerPublisher();

// Initialize Mux Lock

pthread_mutex_init(&receiverMutex, NULL);

// Initializing some variables...

// Launch Thread to get updates from windows machine
pthread_t publisherThread;
pthread_create(&publisherThread,
        NULL, getData, subscriber_socket);

// UI stuff

zmq_close(subscriber_socket);
return 0;
}

如果您无法提供解决方案,那么我会接受将问题确定为解决方案。我的主要问题是我没有消息或网络方面的知识或经验来正确识别问题。通常,如果我知道什么是错的,我可以解决它。

c++ multithreading zeromq
1个回答
0
投票

好的,这与信令/消息传递框架无关

你的Ubuntu代码指示ZeroMQ Context()-instance引擎创建一个新的SUB-socket实例,然后代码坚持这个套接字试图将_connect()(建立一个tcp://传输级连接到对等的对手)到“对面”接入点,坐在设置为localhost:port#的Ubuntu 10.10.10.15:5556的地址,而预期的PUB侧原型访问点实际上不在这个Ubuntu机器上,但在另一个,Windows主机,IP:port#其中是10.10.10.200:5556

这似乎是问题的根本原因,因此相应地更改它以匹配物理布局,您可以使玩具工作。

© www.soinside.com 2019 - 2024. All rights reserved.