Zmq 无效数据 c++ [关闭]

问题描述 投票:0回答:0

我正在尝试创建一个可以使用 zmq 发送任何数据的 c++ 套接字库。服务器和客户端连接到一个临时的 REQ/RES 套接字,并使用 PUSH/PULL 传输数据。客户端可以连接并发送端口和ip之类的数据,但是之后无法传输数据。服务器可以识别客户端何时发送消息,但无法获取实际消息。当我从客户端发送数据时,它很好,但是当我将它转换为 Msg 结构时,数据无效,例如 senderId 通常为 0 或 25968,当我尝试与消息结构中的实际数据对象交互时,我发生(被信号 11 中断:SIGSEGV)崩溃。

networkUtil.h:


#pragma once

namespace myNetLib{
    class Util{
    public:

            struct ClientInfoToServer{
                std::string ip;
                unsigned short dataPullPort;
                float version;
            };

            struct ConnectionToServerInfo{
                unsigned short dataPullPort;
                unsigned short clientId;
            };
    };
}

ConnectionBaseInterface.h:

#include <zmq.hpp>

#define USING_MSG(x) using typename ConnectionBaseInterface<x>::Msg;

namespace myNetLib {

    template<typename T>
    class ConnectionBaseInterface {

    public:
        struct Msg {
            T messageType;
            void *data;
            unsigned short receiverId;
            unsigned short senderId;
            bool wasHandled = false;
        };

    protected:
        explicit ConnectionBaseInterface(int threads, float version) : version(version) {
            context = new zmq::context_t(threads);
        }

        virtual void receiveMessage(Msg msg) = 0;

        zmq::context_t *context;
        float version;
    };

}

ServerInterface.h:

#include <iostream>
#include <zmq.hpp>
#include "ConnectionBaseInterface.h"
#include "networkUtil.h"
#include <map>
#include <thread>

#pragma once

namespace myNetLib {

    template<typename MessageTypes>
    class ServerInterface : public ConnectionBaseInterface<MessageTypes> {



    protected:

        USING_MSG(MessageTypes)
        explicit ServerInterface(float pVersion, bool pAutoHandleRequests,
                                 unsigned short port = 5555, int threadCount = 1) :
                ConnectionBaseInterface<MessageTypes>(threadCount, pVersion) {
            connectionValidator = zmq::socket_t(*this->context, zmq::socket_type::rep);
            connectionValidator.bind("tcp://*:" + std::to_string(port));

            receiver = zmq::socket_t(*this->context, zmq::socket_type::pull);
            receiver.bind("tcp://*:" + std::to_string(1251));

            auto checkMessagesThread = new std::thread(&ServerInterface::receiveMessageThread, this);
            checkMessagesThread->detach();

            nextFreeClientId = 100;
            autoHandleRequests = pAutoHandleRequests;
            if (autoHandleRequests) {
                auto connectionValidatorThread = new std::thread(&ServerInterface::connectionValidatorThread, this);
                connectionValidatorThread->detach();
            }
        }

        void sendMsgToAllClients(Msg msg, bool excludeSender = true) {
            for (auto &client: clientPushSockets) {
                std::cout << "Sending to client " << client.first << std::endl;
                if (excludeSender && client.first == msg.senderId) {
                    continue;
                }
                if (!validateClientExistence(client.first)) {
                    clientPushSockets.erase(client.first);
                    continue;
                }
                auto replyMsg = msg;
                replyMsg.receiverId = client.first;
                zmq::message_t reply(static_cast<void *>(&replyMsg), sizeof(replyMsg));
                client.second->send(reply, zmq::send_flags::none);
            }
        }

        virtual void receiveMessage(Msg msg) = 0;

    private:
        zmq::socket_t connectionValidator;
        zmq::socket_t receiver;
        bool autoHandleRequests;
        unsigned short nextFreeClientId;
        std::map<unsigned short, zmq::socket_t *> clientPushSockets;

        /*
        Before the client can connect to the server it first needs to create a pull socket That the server can connect
        to in the connection process.
        During the connection process the server must create a pul socket that the client can connect to later.
         */

        bool validateClientExistence(unsigned short clientId) {
            if (!clientPushSockets.count(clientId)) {
                std::cout << "Client with id " << clientId << " does not exist" << std::endl;
                return false;
            }
            if (!clientPushSockets[clientId]) {
                std::cout << "Client with id " << clientId << " does not have a push socket" << std::endl;
                return false;
            }

            return true;
        }

        void sendMsgToClient(Msg msg, unsigned short clientId) {
            zmq::message_t reply(static_cast<void *>(&msg), sizeof(msg));
            if (validateClientExistence(clientId)) {
                clientPushSockets[clientId]->send(reply, zmq::send_flags::none);
            } else {
                clientPushSockets.erase(clientId);
            }
        }

        void handleMessageData(zmq::message_t *message) {
            auto requestData = static_cast<Msg*>(message->data());
            std::cout << "Received message from : " << requestData->senderId << std::endl;
            if (requestData->receiverId < 100) {
                /*
                This makes it so that the if a client id is 0 it will be sent to all clients
                 and if it is 1 it will be sent to all clients except the sender
                 */
                if (requestData->receiverId < 2) {
                    sendMsgToAllClients(*requestData, requestData->receiverId == 1);
                    requestData->wasHandled = true;
                }
            } else {
                sendMsgToClient(*requestData, requestData->receiverId);
                requestData->wasHandled = true;
            }
            receiveMessage(*requestData);
        }

        [[noreturn]] void receiveMessageThread() {
            while (true) {
                zmq::message_t request;
                std::cout << "Waiting for message" << std::endl;
                auto res = receiver.recv(request, zmq::recv_flags::none);
                std::cout << "Received message from: " << ((Msg*)request.data())->senderId << std::endl;
                if (res.has_value()) {
                    handleMessageData(&request);
                }
            }
        }

        void connectionValidatorThread() {
            std::cout << "Connection validator thread started" << std::endl;
            while (true) {
                zmq::message_t request;
                auto res = connectionValidator.recv(request, zmq::recv_flags::none);
                auto *requestData = static_cast<myNetLib::Util::ClientInfoToServer *>(request.data());
                if (this->version != requestData->version || !res.has_value()) {
                    std::cout << "Client version mismatch" << std::endl;
                    auto connectionFailInfo = myNetLib::Util::ConnectionToServerInfo{0, 0};
                    zmq::message_t reply(static_cast<void *>(&connectionFailInfo), sizeof(connectionFailInfo));
                    connectionValidator.send(reply, zmq::send_flags::none);
                    continue;
                }
                std::cout << "Client accepted with ip " << requestData->ip << "and pullPort: "
                          << requestData->dataPullPort << std::endl;

                zmq::socket_t newClientPushSocket(*this->context, zmq::socket_type::push);
                newClientPushSocket.connect(
                        "tcp://" + std::string(requestData->ip) + ":" + std::to_string(requestData->dataPullPort));
                clientPushSockets[nextFreeClientId] = &newClientPushSocket;

                auto connectionInfo = myNetLib::Util::ConnectionToServerInfo{1251, nextFreeClientId};
                zmq::message_t reply(static_cast<void *>(&connectionInfo), sizeof(connectionInfo));
                connectionValidator.send(reply, zmq::send_flags::none);
                nextFreeClientId++;
            }
        }

    public:
        void checkForIncomingMessages() {
            if (!autoHandleRequests) {
                zmq::message_t request;
                auto res = receiver.recv(request, zmq::recv_flags::dontwait);
                if (res.has_value()) {
                    handleMessageData(&request);
                }
            }
        }

        bool shouldWait = true;

        void update() {
            while (shouldWait) {}
        }
    };

}

ClientInterface.h:


#include <random>
#include "ConnectionBaseInterface.h"
#include "networkUtil.h"
#include <iostream>
#include <thread>

#pragma once

namespace myNetLib {

    template<typename MessageTypes>
    class ClientInterface : public ConnectionBaseInterface<MessageTypes> {

        USING_MSG(MessageTypes)

    private:
        int getFreePort() {
            int port = 2782;
            while (true) {
                try {
                    zmq::socket_t testSoc(*this->context, zmq::socket_type::rep);
                    testSoc.bind("tcp://*:" + std::to_string(port));
                    return port;
                } catch (zmq::error_t &e) {
                    std::cout << "Port " << port << " is already in use. Trying another port" << std::endl;
                    port++;
                    continue;
                }
            }
            return 2782;
        }

    protected:

        explicit ClientInterface(float pVersion, unsigned short serverPort, int pThreads = 1,
                                 const std::string &serverIp = "localhost",
                                 std::string myIp = "localhost", bool pAutoHandleRequests = true)
                :
                ConnectionBaseInterface<MessageTypes>(pThreads, pVersion) {
            autoHandleRequests = pAutoHandleRequests;
            zmq::socket_t connectionRequester(*this->context, zmq::socket_type::req);
            connectionRequester.connect("tcp://" + serverIp + std::string(":") + std::to_string(serverPort));
            int pullPort = getFreePort();
            myNetLib::Util::ClientInfoToServer info = {myIp, static_cast<unsigned short>(pullPort), pVersion};
            zmq::message_t request(static_cast<void *>(&info), sizeof(info));
            connectionRequester.send(request, zmq::send_flags::none);
            zmq::message_t reply;
            auto res = connectionRequester.recv(reply, zmq::recv_flags::none);
            myNetLib::Util::ConnectionToServerInfo connectionInfo = *static_cast<myNetLib::Util::ConnectionToServerInfo *>(reply.data());
            if (connectionInfo.clientId == 0) {
                if (!res.has_value()) {
                    std::cout << "Connection to server failed. Server did not respond" << std::endl;
                    exit(1);
                }
                std::cout << "Connection to server failed. Invalid version" << std::endl;
                exit(1);
            }
            pullSocket = zmq::socket_t(*this->context, zmq::socket_type::pull);
            pullSocket.bind("tcp://*:" + std::to_string(pullPort));
            this->clientId = connectionInfo.clientId;
            pushSocket = zmq::socket_t(*this->context, zmq::socket_type::push);
            pushSocket.connect("tcp://" + serverIp + std::string(":") + std::to_string(connectionInfo.dataPullPort));
            std::cout << "Server pull port: " << connectionInfo.dataPullPort << std::endl;
            if (autoHandleRequests) {
                auto receiveMessageThreadObj = new std::thread(&ClientInterface::receiveMessageThread, this);
                receiveMessageThreadObj->detach();
            }
        }

        template<typename A>
        void sendMessage(MessageTypes messageType, A data, unsigned short receiverId) {
            auto size = sizeof(MessageTypes) + sizeof(unsigned short) + sizeof(unsigned short ) + sizeof(bool) + sizeof(data);
            if (receiverId < 100) {
                if (receiverId == 0) {
                    sendMessage(Msg{messageType, data, 2, clientId, false}, size);
                    return;
                }
                std::cout << "Invalid receiver id: " << receiverId << std::endl;
                return;
            }
            Msg msg = {messageType, data, receiverId, clientId};
            sendMessage(msg, size);
        }

        void sendMessageToAllClients(MessageTypes messageType, void *data, bool excludeSelf = true) {
            if (excludeSelf) {
                sendMessage(messageType, data, 1);
            } else {
                sendMessage(messageType, data, 0);
            }
        }

        virtual void receiveMessage(Msg msg) = 0;

        unsigned short clientId;

    private:

        void sendMessage(Msg msg, size_t size) {
            auto data = static_cast<void *>(&msg);
            zmq::message_t request(data, size);
            pushSocket.send(request, zmq::send_flags::none);
        }

        void receiveMessageThread() {
            while (true) {
                zmq::message_t message;
                auto res = pullSocket.recv(message, zmq::recv_flags::none);
                if (res.has_value()) {
                    auto *requestData = static_cast<Msg *>(message.data());
                    receiveMessage(*requestData);
                }
            }
        }

        bool autoHandleRequests;
        zmq::socket_t pullSocket;
        zmq::socket_t pushSocket;
        std::random_device rd;

    public:
        void checkForIncomingMessages() {
            if (!autoHandleRequests) {
                std::cout << "Auto handle requests is disabled. Use receiveMessageThread() instead" << std::endl;
                return;
            }
            zmq::message_t message;
            auto res = pullSocket.recv(message, zmq::recv_flags::dontwait);
            if (res.has_value()) {
                auto *requestData = static_cast<Msg *>(message.data());
                receiveMessage(*requestData);
            }
        }
    };
    
}

服务器.cpp:

#include "ServerInterface.h"
#include "networkUtil.h"
#include <iostream>

#define VERSION 0.01

enum class MessageTypes : uint16_t {
    PingServer,
    MessageAll
};

class Server : public myNetLib::ServerInterface<MessageTypes> {

public:
    explicit Server(int port = 5555) :
    ServerInterface<MessageTypes>(VERSION, true, port) {}

    void msgAll(std::string* msg){
        std::cout << "Sending message to all" << std::endl;
        sendMsgToAllClients(Msg{MessageTypes::MessageAll, static_cast<void*>(msg), 0, 0, false});
    }


private:
    void receiveMessage(ConnectionBaseInterface<MessageTypes>::Msg msg) override {
        switch (msg.messageType) {

            case MessageTypes::PingServer:
                std::cout << "Received message from client:" << msg.senderId << std::endl;
                break;
            case MessageTypes::MessageAll:
                if(msg.wasHandled){

                    auto msgString = *static_cast<int*>(msg.data);
                    std::cout << "Client: " << msg.senderId << " sent message to all: " << msgString << std::endl;
                }else{
                    std::cout << "Message to all from client:" << msg.senderId << " wasn't handled" << std::endl;
                }
                break;
        }
    }
};

int main() {
    Server server(5555);
    while (true){
        std::string messageToSend;
        std::cin >> messageToSend;
    }
    return 0;
}

Client.cpp:

#include <iostream>
#include "../ClientInterface.h"

#define CLIENT_VERSION 0.01

enum class MessageTypes : uint16_t {
    PingServer,
    MessageAll
};

class Client : public myNetLib::ClientInterface<MessageTypes> {

public:
    explicit Client(int serverPort = 5555, const std::string& serverIp = "localhost",
                    const std::string& myIp = "localhost") :
            ClientInterface<MessageTypes>(CLIENT_VERSION, serverPort, 1, serverIp, myIp, true) {}

    void msgServer(std::string* msg){
        std::cout << "Sending message to server" << std::endl;
        sendMessage(MessageTypes::PingServer, static_cast<void*>(msg), 0);
    }

    void msgAll(int msg){
        std::cout << "Sending message to all" << std::endl;
        std::cout << "Sending message to all: " << msg << std::endl;
        sendMessageToAllClients(MessageTypes::MessageAll, static_cast<void*>(&msg), false);
    }

private:
    void receiveMessage(ConnectionBaseInterface<MessageTypes>::Msg msg) override {
        switch (msg.messageType) {
            case MessageTypes::PingServer:
                std::cout << "Weird message" << std::endl;
                break;
            case MessageTypes::MessageAll:
                auto message = static_cast<int*>(msg.data);
                std::cout << "Message from client " << msg.senderId << ": " << *message << std::endl;
                break;
        }
    }
};


int main(){
    Client client(5555, "localhost", "localhost");
    std::this_thread::sleep_for(std::chrono::seconds(4));
    client.msgServer(new std::string("Hello server"));
    while (true) {
        std::string messageToSend;
        std::cin >> messageToSend;
        client.msgAll(123);
    }
    return 0;
}
c++ zeromq
© www.soinside.com 2019 - 2024. All rights reserved.