接收客户端发送的数据时发生错误。 :remote_endpoint:传输端点未连接

问题描述 投票:0回答:1

我使用Boost.asio创建了一个使用tcp方法进行通信的服务器和客户端。服务器从个人或少量客户端接收信息没有问题,但如果每秒从大量客户端(大约 100 或更多)接收信息,它随后会收到一条错误消息“remote_endpoint:发送端点未连接”。我想修复这个错误。

这是我的代码

#include <iostream>
#include <array>
#include <map>
#include <boost/asio.hpp>
#include <mysql/mysql.h>
#include <nlohmann/json.hpp>
#include <chrono>
#include <cmath>

using namespace boost::asio;
using json = nlohmann::json;

class AsyncTCPServer {
public:
    // Constructor
    AsyncTCPServer(io_service& io_service, short port)
        : acceptor_(io_service, ip::tcp::endpoint(ip::tcp::v4(), port)),
        socket_(io_service) {
            StartAccept();
            ConnectToMariaDB();
    }
    // Destructor
    ~AsyncTCPServer() {
        // Disconnect from MariaDB
        if (db_connection_ != nullptr) {
            mysql_close(db_connection_);
            std::cout << "MariaDB connection closed" << std::endl;
        }
    }

private:
    // Start accepting new client connections and initiate asynchronous communication
    void StartAccept() {
        acceptor_.async_accept(socket_,
            [this](boost::system::error_code ec) {
                if(ec) {
                    std::cerr << "Error occurred during connection acceptance: " << ec.message() << std::endl;
                } else {
                    // Add the new client socket to the array for management
                    AddClient(std::make_shared<ip::tcp::socket>(std::move(socket_)));
                    // Wait for the next client connection
                    StartAccept();
                    // Initiate asynchronous communication for the current client
                    StartRead(clients_[num_clients_ - 1]);
                }
            });
    }
 
    // Add a new client to the array and print a connection message
    void AddClient(std::shared_ptr<ip::tcp::socket> client) {
        std::lock_guard<std::mutex> lock(mutex_);
    
        if (num_clients_ < max_clients) {
            clients_[num_clients_] = client;
            num_clients_++;
    
            // Print connection message
            std::cout << client->remote_endpoint().address().to_string() + " connected." << std::endl;
        } else {
            std::cerr << "Cannot accept connection, maximum number of clients exceeded." << std::endl;
            client->close();
        }
    }
 
    // Start asynchronous reading for the client
    void StartRead(std::shared_ptr<ip::tcp::socket> client) {
        auto& buffer = buffers_[client];  // Get the buffer associated with the client
        async_read_until(*client, buffer, '\n',
            [this, client, &buffer](boost::system::error_code ec, std::size_t length) {
                if (ec) {
                    RemoveClient(client);
                } else {
                    std::istream is(&buffer);
                    std::string message;
                    std::getline(is, message);
                    // Save to the database
                    SaveToDatabase(message);
                    StartRead(client);
                }
            });
    }

    // Connect to MariaDB
    void ConnectToMariaDB() {
        // Initialize MariaDB connection
        db_connection_ = mysql_init(nullptr);
        if (db_connection_ == nullptr) {
            std::cerr << "Failed to initialize MariaDB connection" << std::endl;
            exit(1);
        }

        const char* host = "localhost";
        const char* user = "root";
        const char* password = "1234";
        const char* database = "servertest";
        unsigned int port = 3306;

        if (mysql_real_connect(db_connection_, host, user, password, database, port, nullptr, 0) == nullptr) {
            std::cerr << "Failed to connect to MariaDB: " << mysql_error(db_connection_) << std::endl;
            exit(1);
        }

        std::cout << "MariaDB connection established" << std::endl;
    }

    // Save message to the database
    void SaveToDatabase(const std::string& message) {
       // Assume JSON and save to the database
       try {
          auto j = json::parse(message);
          std::lock_guard<std::mutex> lock(mutex_);
            
          if (j.find("CPU") != j.end()) {
             SaveCpuToDatabase(j["CPU"]);
          }
          if (j.find("NIC") != j.end()) {
             SaveNicToDatabase(j["NIC"]);
          }
          if (j.find("Memory") != j.end()) {
             SaveMemoryToDatabase(j["Memory"]);
          }
          if (j.find("Disk") != j.end()) {
             SaveDiskToDatabase(j["Disk"]);
          }
          std::cout << "Saved to the database" << std::endl;
       } catch (const nlohmann::detail::parse_error& e) { // Catch JSON parsing errors
          std::cerr << "JSON parsing error: " << e.what() << std::endl;
          std::cerr << "Error occurred in the message: " << message << std::endl;
       } catch (const std::exception& e) {
          std::cerr << "Failed to save to the database: " << e.what() << std::endl;
          std::cerr << "Error occurred in the message: " << message << std::endl;
       }
    }
 
    void SaveCpuToDatabase(const json& cpuData) {
        for (const auto& processor : cpuData) {
            // Extract information for each processor
            std::string cores = processor["Cores"].get<std::string>();
            std::string model = processor["Model"].get<std::string>();
            std::string siblings = processor["Siblings"].get<std::string>();
            
            // Generate query and save data to the DB
            std::string query = "INSERT INTO cpu_table (cores, model, siblings) VALUES ('" + cores + "', '" + model + "', '" + siblings + "')";
            if (mysql_query(db_connection_, query.c_str()) != 0) {
                std::cerr << "Error occurred while saving CPU information to the database: " << mysql_error(db_connection_) << std::endl;
            }
        }
    }
 
    void SaveNicToDatabase(const json& nicData) {
        for (const auto& nic : nicData) {
            // Extract information for each NIC
            std::string interface = nic["Interface"].get<std::string>();
            std::string mac_address = nic["MAC Address"].get<std::string>();
            std::string operational_state = nic["Operational State"].get<std::string>();
            std::string speed = nic["Speed"].get<std::string>();

            // Generate query and save data to the DB
            std::string query = "INSERT INTO nic_table (interface, mac_address, operational_state, speed) VALUES ('" + interface + "', '" + mac_address + "', '" + operational_state + "', '" + speed + "')";
            int queryResult = mysql_query(db_connection_, query.c_str());
            if (queryResult != 0) {
                std::cerr << "Error occurred while saving NIC information to the database: " << mysql_error(db_connection_) << std::endl;
            }
        }
    }
 
    void SaveMemoryToDatabase(const json& memoryData) {
        // Similar logic for saving memory information to the database
        // ...
    }
 
    void SaveDiskToDatabase(const json& diskData) {
        // Similar logic for saving disk information to the database
        // ...
    }
   
    // Remove client from the array and print a connection termination message
    void RemoveClient(std::shared_ptr<ip::tcp::socket> client) {
        for (int i = 0; i < max_clients; ++i) {
            if (clients_[i] == client) {
                clients_[i] = nullptr;
                num_clients_--;
                // Print connection termination message
                std::cout << client->remote_endpoint().address().to_string() + " connection terminated." << std::endl;

                boost::system::error_code ec;
                client->shutdown(ip::tcp::socket::shutdown_both, ec);
                client->close(ec);

                break;
            }
        }
    }

    ip::tcp::acceptor acceptor_; // TCP acceptor
    ip::tcp::socket socket_; // TCP socket
    static const int max_clients = 100000;  // Maximum number of clients
    std::array<std::shared_ptr<ip::tcp::socket>, max_clients> clients_; // TCP sockets corresponding to the maximum number of clients
    int num_clients_ = 0; // Current number of connected clients
    std::mutex mutex_;
    // MariaDB connection handler
    MYSQL* db_connection_;
    std::map<std::shared_ptr<ip::tcp::socket>, streambuf> buffers_;  // Map to manage buffers for each client
};

int main() {
    std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
    try {
        boost::asio::io_service io_service; // Create io_service object
        AsyncTCPServer server(io_service, 12345); // Create an object of the AsyncTCPServer class (object, port number)
        io_service.run(); // Start the event loop
    } catch (std::exception& e) {
        std::cerr << "Exception caught: " << e.what() << std::endl;
        std::chrono::duration<double> sec = std::chrono::system_clock::now() - start;
        std::cout << "Time taken to run (seconds): " << sec.count() << " seconds" << std::endl;
    }

    return 0;
}
c++ linux boost-asio rocky-os
1个回答
0
投票

如果套接字不再连接,则

remote_endpoint()
访问器会引发错误。这包括对等方关闭连接的时间。

一般来说,当操作因错误条件而终止时,您无法使用它。

保留端点(例如用于日志记录目的)的常用方法是在新接受连接时保存它。所以而不是

std::array<std::shared_ptr<ip::tcp::socket>, max_clients> clients_;

你可能有

struct Client {
    ip::tcp::socket   socket;
    ip::tcp::endpoint remote_endpoint;
};

std::array<std::shared_ptr<Client>, max_clients> clients_;

重构

事实上,代码的组织方式似乎更典型的 C 风格选择/轮询代码,而不是典型的 Asio 代码。

Client
将是封装服务器所有特定于会话的逻辑的完美候选者。

在这里,我展示了重构后的代码,以分离之间的关注点

  • Database
    知道如何保存消息并拥有连接
  • Server
    接受套接字连接并拥有一个
    Database
    实例
  • Session
    拥有单个客户端套接字和相关资源(如缓冲区),它获取对
    Database
    的引用以进行消息处理

较小的变化

  • 它避免了
    streambuf
    istream
    处理(成本高昂)
  • 它显式使用
    length
    (而不是通过
    std::getline
    隐式使用)
  • 它避免了过程中的缓冲区复制
  • 它避免了不必要的锁定(您的服务器是单线程的,因此存在隐式链)
  • 我选择保留锁定
    Database
    ,这样它就可以保持线程感知,以防您在不同的地方使用它
  • 我更改了手动数组操作,以将固定大小的容器模拟为 boost
    flat_set
    ,而不是
    static_vector
    。这消除了几类内存管理和异常安全错误
  • 删除了“RemoveClient”,它很容易看起来像
     size_t RemoveClient(std::shared_ptr<Session> const& c) {
         return sessions_.erase(c);
     }
    
    如果你愿意的话
  • 相反,我们依靠
    enable_shared_from_this
    来管理
    Session
    生命周期,这意味着我们可以根据需要删除过期的会话。

备注:

    对于堆栈分配的对象来说,100'000 个连接是很大的。我可能根本不会使用静态容器
  • 100'000 个连接无法通过单个
  • MYSQL
     连接进行扩展。相反,您可能希望对消息进行排队,以供多个数据库工作人员使用,这些数据库工作人员同时保存到数据库 - 最好在集群上,以便平衡负载
  • 存在明显的 SQL 注入潜力。考虑使用
  • Boost Mysql,它支持异步操作和准备好的语句
  • 避免使用
  • detail
    命名空间
  • 文件

    Database.h

    
    

    #pragma once #include <nlohmann/json.hpp> #include <string_view> using nlohmann::json; struct Database { Database(); ~Database(); void SaveToDatabase(std::string_view message); private: // Connect to MariaDB void ConnectToMariaDB(); void SaveCpuToDatabase(json const& cpuData); void SaveNicToDatabase(json const& nicData); void SaveMemoryToDatabase(json const& memoryData); void SaveDiskToDatabase(json const& diskData); // MariaDB connection handle std::mutex mutex_; struct MYSQL* handle_ = nullptr; };
    
    
  • 文件

    Database.cpp

    
    

    #include "Database.h" #include <iostream> #include <mysql/mysql.h> using namespace std::string_literals; Database::Database() { ConnectToMariaDB(); } Database::~Database() { if (handle_) { ::mysql_close(handle_); std::cout << "MariaDB connection closed" << std::endl; } } // Save message to the database void Database::SaveToDatabase(std::string_view message) { // Assume JSON and save to the database try { json j = json::parse(message); std::lock_guard lock(mutex_); if (auto it = j.find("CPU"); it != j.end()) SaveCpuToDatabase(*it); if (auto it = j.find("NIC"); it != j.end()) SaveNicToDatabase(*it); if (auto it = j.find("Memory"); it != j.end()) SaveMemoryToDatabase(*it); if (auto it = j.find("Disk"); it != j.end()) SaveDiskToDatabase(*it); std::cout << "Saved to the database" << std::endl; } catch (nlohmann::json::parse_error const& e) { // Catch JSON parsing errors std::cerr << "JSON parsing error: " << e.what() << std::endl; std::cerr << "Error occurred in the message: " << message << std::endl; } catch (std::exception const& e) { std::cerr << "Failed to save to the database: " << e.what() << std::endl; std::cerr << "Error occurred in the message: " << message << std::endl; } } void Database::ConnectToMariaDB() { // Initialize MariaDB connection handle_ = ::mysql_init(nullptr); if (!handle_) throw std::runtime_error("Failed to initialize MariaDB connection"); char const* host = "localhost"; char const* user = "root"; char const* password = "1234"; char const* database = "servertest"; unsigned int port = 3306; if (::mysql_real_connect(handle_, host, user, password, database, port, nullptr, 0) == nullptr) throw std::runtime_error("Failed to connect to MariaDB: "s + ::mysql_error(handle_)); std::cout << "MariaDB connection established" << std::endl; } void Database::SaveCpuToDatabase(json const& cpuData) { for (auto const& processor : cpuData) { // Extract information for each processor std::string cores = processor["Cores"].get<std::string>(); std::string model = processor["Model"].get<std::string>(); std::string siblings = processor["Siblings"].get<std::string>(); // Generate query and save data to the DB std::string query = "INSERT INTO cpu_table (cores, model, siblings) VALUES ('" + cores + "', '" + model + "', '" + siblings + "')"; if (::mysql_query(handle_, query.c_str()) != 0) { std::cerr << "Error occurred while saving CPU information to the database: " << ::mysql_error(handle_) << std::endl; } } } void Database::SaveNicToDatabase(json const& nicData) { for (auto const& nic : nicData) { // Extract information for each NIC std::string interface = nic["Interface"].get<std::string>(); std::string mac_address = nic["MAC Address"].get<std::string>(); std::string operational_state = nic["Operational State"].get<std::string>(); std::string speed = nic["Speed"].get<std::string>(); // Generate query and save data to the DB std::string query = "INSERT INTO nic_table (interface, mac_address, operational_state, speed) VALUES ('" + interface + "', '" + mac_address + "', '" + operational_state + "', '" + speed + "')"; int queryResult = ::mysql_query(handle_, query.c_str()); if (queryResult != 0) { std::cerr << "Error occurred while saving NIC information to the database: " << ::mysql_error(handle_) << std::endl; } } } void Database::SaveMemoryToDatabase(json const& /*memoryData*/) { // Similar logic for saving memory information to the database // ... } void Database::SaveDiskToDatabase(json const& /*diskData*/) { // Similar logic for saving disk information to the database // ... }
    
    
© www.soinside.com 2019 - 2024. All rights reserved.