我正在尝试创建一个可以使用 zmq 发送任何数据的 c++ 套接字库。服务器和客户端连接到一个临时的 REQ/RES 套接字,并使用 PUSH/PULL 传输数据。客户端可以连接并发送端口和ip之类的数据,但是之后无法传输数据。服务器可以识别客户端何时发送消息,但无法获取实际消息。当我从客户端发送数据时,它很好,但是当我将它转换为 Msg 结构时,数据无效,例如 senderId 通常为 0 或 25968,当我尝试与消息结构中的实际数据对象交互时,我发生(被信号 11 中断:SIGSEGV)崩溃。
networkUtil.h:
#pragma once
namespace myNetLib{
class Util{
public:
struct ClientInfoToServer{
std::string ip;
unsigned short dataPullPort;
float version;
};
struct ConnectionToServerInfo{
unsigned short dataPullPort;
unsigned short clientId;
};
};
}
ConnectionBaseInterface.h:
#include <zmq.hpp>
#define USING_MSG(x) using typename ConnectionBaseInterface<x>::Msg;
namespace myNetLib {
template<typename T>
class ConnectionBaseInterface {
public:
struct Msg {
T messageType;
void *data;
unsigned short receiverId;
unsigned short senderId;
bool wasHandled = false;
};
protected:
explicit ConnectionBaseInterface(int threads, float version) : version(version) {
context = new zmq::context_t(threads);
}
virtual void receiveMessage(Msg msg) = 0;
zmq::context_t *context;
float version;
};
}
ServerInterface.h:
#include <iostream>
#include <zmq.hpp>
#include "ConnectionBaseInterface.h"
#include "networkUtil.h"
#include <map>
#include <thread>
#pragma once
namespace myNetLib {
template<typename MessageTypes>
class ServerInterface : public ConnectionBaseInterface<MessageTypes> {
protected:
USING_MSG(MessageTypes)
explicit ServerInterface(float pVersion, bool pAutoHandleRequests,
unsigned short port = 5555, int threadCount = 1) :
ConnectionBaseInterface<MessageTypes>(threadCount, pVersion) {
connectionValidator = zmq::socket_t(*this->context, zmq::socket_type::rep);
connectionValidator.bind("tcp://*:" + std::to_string(port));
receiver = zmq::socket_t(*this->context, zmq::socket_type::pull);
receiver.bind("tcp://*:" + std::to_string(1251));
auto checkMessagesThread = new std::thread(&ServerInterface::receiveMessageThread, this);
checkMessagesThread->detach();
nextFreeClientId = 100;
autoHandleRequests = pAutoHandleRequests;
if (autoHandleRequests) {
auto connectionValidatorThread = new std::thread(&ServerInterface::connectionValidatorThread, this);
connectionValidatorThread->detach();
}
}
void sendMsgToAllClients(Msg msg, bool excludeSender = true) {
for (auto &client: clientPushSockets) {
std::cout << "Sending to client " << client.first << std::endl;
if (excludeSender && client.first == msg.senderId) {
continue;
}
if (!validateClientExistence(client.first)) {
clientPushSockets.erase(client.first);
continue;
}
auto replyMsg = msg;
replyMsg.receiverId = client.first;
zmq::message_t reply(static_cast<void *>(&replyMsg), sizeof(replyMsg));
client.second->send(reply, zmq::send_flags::none);
}
}
virtual void receiveMessage(Msg msg) = 0;
private:
zmq::socket_t connectionValidator;
zmq::socket_t receiver;
bool autoHandleRequests;
unsigned short nextFreeClientId;
std::map<unsigned short, zmq::socket_t *> clientPushSockets;
/*
Before the client can connect to the server it first needs to create a pull socket That the server can connect
to in the connection process.
During the connection process the server must create a pul socket that the client can connect to later.
*/
bool validateClientExistence(unsigned short clientId) {
if (!clientPushSockets.count(clientId)) {
std::cout << "Client with id " << clientId << " does not exist" << std::endl;
return false;
}
if (!clientPushSockets[clientId]) {
std::cout << "Client with id " << clientId << " does not have a push socket" << std::endl;
return false;
}
return true;
}
void sendMsgToClient(Msg msg, unsigned short clientId) {
zmq::message_t reply(static_cast<void *>(&msg), sizeof(msg));
if (validateClientExistence(clientId)) {
clientPushSockets[clientId]->send(reply, zmq::send_flags::none);
} else {
clientPushSockets.erase(clientId);
}
}
void handleMessageData(zmq::message_t *message) {
auto requestData = static_cast<Msg*>(message->data());
std::cout << "Received message from : " << requestData->senderId << std::endl;
if (requestData->receiverId < 100) {
/*
This makes it so that the if a client id is 0 it will be sent to all clients
and if it is 1 it will be sent to all clients except the sender
*/
if (requestData->receiverId < 2) {
sendMsgToAllClients(*requestData, requestData->receiverId == 1);
requestData->wasHandled = true;
}
} else {
sendMsgToClient(*requestData, requestData->receiverId);
requestData->wasHandled = true;
}
receiveMessage(*requestData);
}
[[noreturn]] void receiveMessageThread() {
while (true) {
zmq::message_t request;
std::cout << "Waiting for message" << std::endl;
auto res = receiver.recv(request, zmq::recv_flags::none);
std::cout << "Received message from: " << ((Msg*)request.data())->senderId << std::endl;
if (res.has_value()) {
handleMessageData(&request);
}
}
}
void connectionValidatorThread() {
std::cout << "Connection validator thread started" << std::endl;
while (true) {
zmq::message_t request;
auto res = connectionValidator.recv(request, zmq::recv_flags::none);
auto *requestData = static_cast<myNetLib::Util::ClientInfoToServer *>(request.data());
if (this->version != requestData->version || !res.has_value()) {
std::cout << "Client version mismatch" << std::endl;
auto connectionFailInfo = myNetLib::Util::ConnectionToServerInfo{0, 0};
zmq::message_t reply(static_cast<void *>(&connectionFailInfo), sizeof(connectionFailInfo));
connectionValidator.send(reply, zmq::send_flags::none);
continue;
}
std::cout << "Client accepted with ip " << requestData->ip << "and pullPort: "
<< requestData->dataPullPort << std::endl;
zmq::socket_t newClientPushSocket(*this->context, zmq::socket_type::push);
newClientPushSocket.connect(
"tcp://" + std::string(requestData->ip) + ":" + std::to_string(requestData->dataPullPort));
clientPushSockets[nextFreeClientId] = &newClientPushSocket;
auto connectionInfo = myNetLib::Util::ConnectionToServerInfo{1251, nextFreeClientId};
zmq::message_t reply(static_cast<void *>(&connectionInfo), sizeof(connectionInfo));
connectionValidator.send(reply, zmq::send_flags::none);
nextFreeClientId++;
}
}
public:
void checkForIncomingMessages() {
if (!autoHandleRequests) {
zmq::message_t request;
auto res = receiver.recv(request, zmq::recv_flags::dontwait);
if (res.has_value()) {
handleMessageData(&request);
}
}
}
bool shouldWait = true;
void update() {
while (shouldWait) {}
}
};
}
ClientInterface.h:
#include <random>
#include "ConnectionBaseInterface.h"
#include "networkUtil.h"
#include <iostream>
#include <thread>
#pragma once
namespace myNetLib {
template<typename MessageTypes>
class ClientInterface : public ConnectionBaseInterface<MessageTypes> {
USING_MSG(MessageTypes)
private:
int getFreePort() {
int port = 2782;
while (true) {
try {
zmq::socket_t testSoc(*this->context, zmq::socket_type::rep);
testSoc.bind("tcp://*:" + std::to_string(port));
return port;
} catch (zmq::error_t &e) {
std::cout << "Port " << port << " is already in use. Trying another port" << std::endl;
port++;
continue;
}
}
return 2782;
}
protected:
explicit ClientInterface(float pVersion, unsigned short serverPort, int pThreads = 1,
const std::string &serverIp = "localhost",
std::string myIp = "localhost", bool pAutoHandleRequests = true)
:
ConnectionBaseInterface<MessageTypes>(pThreads, pVersion) {
autoHandleRequests = pAutoHandleRequests;
zmq::socket_t connectionRequester(*this->context, zmq::socket_type::req);
connectionRequester.connect("tcp://" + serverIp + std::string(":") + std::to_string(serverPort));
int pullPort = getFreePort();
myNetLib::Util::ClientInfoToServer info = {myIp, static_cast<unsigned short>(pullPort), pVersion};
zmq::message_t request(static_cast<void *>(&info), sizeof(info));
connectionRequester.send(request, zmq::send_flags::none);
zmq::message_t reply;
auto res = connectionRequester.recv(reply, zmq::recv_flags::none);
myNetLib::Util::ConnectionToServerInfo connectionInfo = *static_cast<myNetLib::Util::ConnectionToServerInfo *>(reply.data());
if (connectionInfo.clientId == 0) {
if (!res.has_value()) {
std::cout << "Connection to server failed. Server did not respond" << std::endl;
exit(1);
}
std::cout << "Connection to server failed. Invalid version" << std::endl;
exit(1);
}
pullSocket = zmq::socket_t(*this->context, zmq::socket_type::pull);
pullSocket.bind("tcp://*:" + std::to_string(pullPort));
this->clientId = connectionInfo.clientId;
pushSocket = zmq::socket_t(*this->context, zmq::socket_type::push);
pushSocket.connect("tcp://" + serverIp + std::string(":") + std::to_string(connectionInfo.dataPullPort));
std::cout << "Server pull port: " << connectionInfo.dataPullPort << std::endl;
if (autoHandleRequests) {
auto receiveMessageThreadObj = new std::thread(&ClientInterface::receiveMessageThread, this);
receiveMessageThreadObj->detach();
}
}
template<typename A>
void sendMessage(MessageTypes messageType, A data, unsigned short receiverId) {
auto size = sizeof(MessageTypes) + sizeof(unsigned short) + sizeof(unsigned short ) + sizeof(bool) + sizeof(data);
if (receiverId < 100) {
if (receiverId == 0) {
sendMessage(Msg{messageType, data, 2, clientId, false}, size);
return;
}
std::cout << "Invalid receiver id: " << receiverId << std::endl;
return;
}
Msg msg = {messageType, data, receiverId, clientId};
sendMessage(msg, size);
}
void sendMessageToAllClients(MessageTypes messageType, void *data, bool excludeSelf = true) {
if (excludeSelf) {
sendMessage(messageType, data, 1);
} else {
sendMessage(messageType, data, 0);
}
}
virtual void receiveMessage(Msg msg) = 0;
unsigned short clientId;
private:
void sendMessage(Msg msg, size_t size) {
auto data = static_cast<void *>(&msg);
zmq::message_t request(data, size);
pushSocket.send(request, zmq::send_flags::none);
}
void receiveMessageThread() {
while (true) {
zmq::message_t message;
auto res = pullSocket.recv(message, zmq::recv_flags::none);
if (res.has_value()) {
auto *requestData = static_cast<Msg *>(message.data());
receiveMessage(*requestData);
}
}
}
bool autoHandleRequests;
zmq::socket_t pullSocket;
zmq::socket_t pushSocket;
std::random_device rd;
public:
void checkForIncomingMessages() {
if (!autoHandleRequests) {
std::cout << "Auto handle requests is disabled. Use receiveMessageThread() instead" << std::endl;
return;
}
zmq::message_t message;
auto res = pullSocket.recv(message, zmq::recv_flags::dontwait);
if (res.has_value()) {
auto *requestData = static_cast<Msg *>(message.data());
receiveMessage(*requestData);
}
}
};
}
服务器.cpp:
#include "ServerInterface.h"
#include "networkUtil.h"
#include <iostream>
#define VERSION 0.01
enum class MessageTypes : uint16_t {
PingServer,
MessageAll
};
class Server : public myNetLib::ServerInterface<MessageTypes> {
public:
explicit Server(int port = 5555) :
ServerInterface<MessageTypes>(VERSION, true, port) {}
void msgAll(std::string* msg){
std::cout << "Sending message to all" << std::endl;
sendMsgToAllClients(Msg{MessageTypes::MessageAll, static_cast<void*>(msg), 0, 0, false});
}
private:
void receiveMessage(ConnectionBaseInterface<MessageTypes>::Msg msg) override {
switch (msg.messageType) {
case MessageTypes::PingServer:
std::cout << "Received message from client:" << msg.senderId << std::endl;
break;
case MessageTypes::MessageAll:
if(msg.wasHandled){
auto msgString = *static_cast<int*>(msg.data);
std::cout << "Client: " << msg.senderId << " sent message to all: " << msgString << std::endl;
}else{
std::cout << "Message to all from client:" << msg.senderId << " wasn't handled" << std::endl;
}
break;
}
}
};
int main() {
Server server(5555);
while (true){
std::string messageToSend;
std::cin >> messageToSend;
}
return 0;
}
Client.cpp:
#include <iostream>
#include "../ClientInterface.h"
#define CLIENT_VERSION 0.01
enum class MessageTypes : uint16_t {
PingServer,
MessageAll
};
class Client : public myNetLib::ClientInterface<MessageTypes> {
public:
explicit Client(int serverPort = 5555, const std::string& serverIp = "localhost",
const std::string& myIp = "localhost") :
ClientInterface<MessageTypes>(CLIENT_VERSION, serverPort, 1, serverIp, myIp, true) {}
void msgServer(std::string* msg){
std::cout << "Sending message to server" << std::endl;
sendMessage(MessageTypes::PingServer, static_cast<void*>(msg), 0);
}
void msgAll(int msg){
std::cout << "Sending message to all" << std::endl;
std::cout << "Sending message to all: " << msg << std::endl;
sendMessageToAllClients(MessageTypes::MessageAll, static_cast<void*>(&msg), false);
}
private:
void receiveMessage(ConnectionBaseInterface<MessageTypes>::Msg msg) override {
switch (msg.messageType) {
case MessageTypes::PingServer:
std::cout << "Weird message" << std::endl;
break;
case MessageTypes::MessageAll:
auto message = static_cast<int*>(msg.data);
std::cout << "Message from client " << msg.senderId << ": " << *message << std::endl;
break;
}
}
};
int main(){
Client client(5555, "localhost", "localhost");
std::this_thread::sleep_for(std::chrono::seconds(4));
client.msgServer(new std::string("Hello server"));
while (true) {
std::string messageToSend;
std::cin >> messageToSend;
client.msgAll(123);
}
return 0;
}