我使用Boost.asio创建了一个使用tcp方法进行通信的服务器和客户端。服务器从个人或少量客户端接收信息没有问题,但如果每秒从大量客户端(大约 100 或更多)接收信息,它随后会收到一条错误消息“remote_endpoint:发送端点未连接”。我想修复这个错误。
这是我的代码
#include <iostream>
#include <array>
#include <map>
#include <boost/asio.hpp>
#include <mysql/mysql.h>
#include <nlohmann/json.hpp>
#include <chrono>
#include <cmath>
using namespace boost::asio;
using json = nlohmann::json;
class AsyncTCPServer {
public:
// Constructor
AsyncTCPServer(io_service& io_service, short port)
: acceptor_(io_service, ip::tcp::endpoint(ip::tcp::v4(), port)),
socket_(io_service) {
StartAccept();
ConnectToMariaDB();
}
// Destructor
~AsyncTCPServer() {
// Disconnect from MariaDB
if (db_connection_ != nullptr) {
mysql_close(db_connection_);
std::cout << "MariaDB connection closed" << std::endl;
}
}
private:
// Start accepting new client connections and initiate asynchronous communication
void StartAccept() {
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec) {
if(ec) {
std::cerr << "Error occurred during connection acceptance: " << ec.message() << std::endl;
} else {
// Add the new client socket to the array for management
AddClient(std::make_shared<ip::tcp::socket>(std::move(socket_)));
// Wait for the next client connection
StartAccept();
// Initiate asynchronous communication for the current client
StartRead(clients_[num_clients_ - 1]);
}
});
}
// Add a new client to the array and print a connection message
void AddClient(std::shared_ptr<ip::tcp::socket> client) {
std::lock_guard<std::mutex> lock(mutex_);
if (num_clients_ < max_clients) {
clients_[num_clients_] = client;
num_clients_++;
// Print connection message
std::cout << client->remote_endpoint().address().to_string() + " connected." << std::endl;
} else {
std::cerr << "Cannot accept connection, maximum number of clients exceeded." << std::endl;
client->close();
}
}
// Start asynchronous reading for the client
void StartRead(std::shared_ptr<ip::tcp::socket> client) {
auto& buffer = buffers_[client]; // Get the buffer associated with the client
async_read_until(*client, buffer, '\n',
[this, client, &buffer](boost::system::error_code ec, std::size_t length) {
if (ec) {
RemoveClient(client);
} else {
std::istream is(&buffer);
std::string message;
std::getline(is, message);
// Save to the database
SaveToDatabase(message);
StartRead(client);
}
});
}
// Connect to MariaDB
void ConnectToMariaDB() {
// Initialize MariaDB connection
db_connection_ = mysql_init(nullptr);
if (db_connection_ == nullptr) {
std::cerr << "Failed to initialize MariaDB connection" << std::endl;
exit(1);
}
const char* host = "localhost";
const char* user = "root";
const char* password = "1234";
const char* database = "servertest";
unsigned int port = 3306;
if (mysql_real_connect(db_connection_, host, user, password, database, port, nullptr, 0) == nullptr) {
std::cerr << "Failed to connect to MariaDB: " << mysql_error(db_connection_) << std::endl;
exit(1);
}
std::cout << "MariaDB connection established" << std::endl;
}
// Save message to the database
void SaveToDatabase(const std::string& message) {
// Assume JSON and save to the database
try {
auto j = json::parse(message);
std::lock_guard<std::mutex> lock(mutex_);
if (j.find("CPU") != j.end()) {
SaveCpuToDatabase(j["CPU"]);
}
if (j.find("NIC") != j.end()) {
SaveNicToDatabase(j["NIC"]);
}
if (j.find("Memory") != j.end()) {
SaveMemoryToDatabase(j["Memory"]);
}
if (j.find("Disk") != j.end()) {
SaveDiskToDatabase(j["Disk"]);
}
std::cout << "Saved to the database" << std::endl;
} catch (const nlohmann::detail::parse_error& e) { // Catch JSON parsing errors
std::cerr << "JSON parsing error: " << e.what() << std::endl;
std::cerr << "Error occurred in the message: " << message << std::endl;
} catch (const std::exception& e) {
std::cerr << "Failed to save to the database: " << e.what() << std::endl;
std::cerr << "Error occurred in the message: " << message << std::endl;
}
}
void SaveCpuToDatabase(const json& cpuData) {
for (const auto& processor : cpuData) {
// Extract information for each processor
std::string cores = processor["Cores"].get<std::string>();
std::string model = processor["Model"].get<std::string>();
std::string siblings = processor["Siblings"].get<std::string>();
// Generate query and save data to the DB
std::string query = "INSERT INTO cpu_table (cores, model, siblings) VALUES ('" + cores + "', '" + model + "', '" + siblings + "')";
if (mysql_query(db_connection_, query.c_str()) != 0) {
std::cerr << "Error occurred while saving CPU information to the database: " << mysql_error(db_connection_) << std::endl;
}
}
}
void SaveNicToDatabase(const json& nicData) {
for (const auto& nic : nicData) {
// Extract information for each NIC
std::string interface = nic["Interface"].get<std::string>();
std::string mac_address = nic["MAC Address"].get<std::string>();
std::string operational_state = nic["Operational State"].get<std::string>();
std::string speed = nic["Speed"].get<std::string>();
// Generate query and save data to the DB
std::string query = "INSERT INTO nic_table (interface, mac_address, operational_state, speed) VALUES ('" + interface + "', '" + mac_address + "', '" + operational_state + "', '" + speed + "')";
int queryResult = mysql_query(db_connection_, query.c_str());
if (queryResult != 0) {
std::cerr << "Error occurred while saving NIC information to the database: " << mysql_error(db_connection_) << std::endl;
}
}
}
void SaveMemoryToDatabase(const json& memoryData) {
// Similar logic for saving memory information to the database
// ...
}
void SaveDiskToDatabase(const json& diskData) {
// Similar logic for saving disk information to the database
// ...
}
// Remove client from the array and print a connection termination message
void RemoveClient(std::shared_ptr<ip::tcp::socket> client) {
for (int i = 0; i < max_clients; ++i) {
if (clients_[i] == client) {
clients_[i] = nullptr;
num_clients_--;
// Print connection termination message
std::cout << client->remote_endpoint().address().to_string() + " connection terminated." << std::endl;
boost::system::error_code ec;
client->shutdown(ip::tcp::socket::shutdown_both, ec);
client->close(ec);
break;
}
}
}
ip::tcp::acceptor acceptor_; // TCP acceptor
ip::tcp::socket socket_; // TCP socket
static const int max_clients = 100000; // Maximum number of clients
std::array<std::shared_ptr<ip::tcp::socket>, max_clients> clients_; // TCP sockets corresponding to the maximum number of clients
int num_clients_ = 0; // Current number of connected clients
std::mutex mutex_;
// MariaDB connection handler
MYSQL* db_connection_;
std::map<std::shared_ptr<ip::tcp::socket>, streambuf> buffers_; // Map to manage buffers for each client
};
int main() {
std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
try {
boost::asio::io_service io_service; // Create io_service object
AsyncTCPServer server(io_service, 12345); // Create an object of the AsyncTCPServer class (object, port number)
io_service.run(); // Start the event loop
} catch (std::exception& e) {
std::cerr << "Exception caught: " << e.what() << std::endl;
std::chrono::duration<double> sec = std::chrono::system_clock::now() - start;
std::cout << "Time taken to run (seconds): " << sec.count() << " seconds" << std::endl;
}
return 0;
}
如果套接字不再连接,则
remote_endpoint()
访问器会引发错误。这包括对等方关闭连接的时间。
一般来说,当操作因错误条件而终止时,您无法使用它。
保留端点(例如用于日志记录目的)的常用方法是在新接受连接时保存它。所以而不是
std::array<std::shared_ptr<ip::tcp::socket>, max_clients> clients_;
你可能有
struct Client {
ip::tcp::socket socket;
ip::tcp::endpoint remote_endpoint;
};
std::array<std::shared_ptr<Client>, max_clients> clients_;
事实上,代码的组织方式似乎更典型的 C 风格选择/轮询代码,而不是典型的 Asio 代码。
Client
将是封装服务器所有特定于会话的逻辑的完美候选者。
在这里,我展示了重构后的代码,以分离之间的关注点
Database
知道如何保存消息并拥有连接Server
接受套接字连接并拥有一个 Database
实例Session
拥有单个客户端套接字和相关资源(如缓冲区),它获取对 Database
的引用以进行消息处理较小的变化
streambuf
与istream
处理(成本高昂)length
(而不是通过 std::getline
隐式使用)Database
,这样它就可以保持线程感知,以防您在不同的地方使用它flat_set
,而不是 static_vector
。这消除了几类内存管理和异常安全错误 size_t RemoveClient(std::shared_ptr<Session> const& c) {
return sessions_.erase(c);
}
如果你愿意的话enable_shared_from_this
来管理 Session
生命周期,这意味着我们可以根据需要删除过期的会话。备注:
MYSQL
连接进行扩展。相反,您可能希望对消息进行排队,以供多个数据库工作人员使用,这些数据库工作人员同时保存到数据库 - 最好在集群上,以便平衡负载
detail
命名空间
Database.h
#pragma once
#include <nlohmann/json.hpp>
#include <string_view>
using nlohmann::json;
struct Database {
Database();
~Database();
void SaveToDatabase(std::string_view message);
private:
// Connect to MariaDB
void ConnectToMariaDB();
void SaveCpuToDatabase(json const& cpuData);
void SaveNicToDatabase(json const& nicData);
void SaveMemoryToDatabase(json const& memoryData);
void SaveDiskToDatabase(json const& diskData);
// MariaDB connection handle
std::mutex mutex_;
struct MYSQL* handle_ = nullptr;
};
Database.cpp
#include "Database.h"
#include <iostream>
#include <mysql/mysql.h>
using namespace std::string_literals;
Database::Database() { ConnectToMariaDB(); }
Database::~Database() {
if (handle_) {
::mysql_close(handle_);
std::cout << "MariaDB connection closed" << std::endl;
}
}
// Save message to the database
void Database::SaveToDatabase(std::string_view message) {
// Assume JSON and save to the database
try {
json j = json::parse(message);
std::lock_guard lock(mutex_);
if (auto it = j.find("CPU"); it != j.end())
SaveCpuToDatabase(*it);
if (auto it = j.find("NIC"); it != j.end())
SaveNicToDatabase(*it);
if (auto it = j.find("Memory"); it != j.end())
SaveMemoryToDatabase(*it);
if (auto it = j.find("Disk"); it != j.end())
SaveDiskToDatabase(*it);
std::cout << "Saved to the database" << std::endl;
} catch (nlohmann::json::parse_error const& e) { // Catch JSON parsing errors
std::cerr << "JSON parsing error: " << e.what() << std::endl;
std::cerr << "Error occurred in the message: " << message << std::endl;
} catch (std::exception const& e) {
std::cerr << "Failed to save to the database: " << e.what() << std::endl;
std::cerr << "Error occurred in the message: " << message << std::endl;
}
}
void Database::ConnectToMariaDB() {
// Initialize MariaDB connection
handle_ = ::mysql_init(nullptr);
if (!handle_)
throw std::runtime_error("Failed to initialize MariaDB connection");
char const* host = "localhost";
char const* user = "root";
char const* password = "1234";
char const* database = "servertest";
unsigned int port = 3306;
if (::mysql_real_connect(handle_, host, user, password, database, port, nullptr, 0) == nullptr)
throw std::runtime_error("Failed to connect to MariaDB: "s + ::mysql_error(handle_));
std::cout << "MariaDB connection established" << std::endl;
}
void Database::SaveCpuToDatabase(json const& cpuData) {
for (auto const& processor : cpuData) {
// Extract information for each processor
std::string cores = processor["Cores"].get<std::string>();
std::string model = processor["Model"].get<std::string>();
std::string siblings = processor["Siblings"].get<std::string>();
// Generate query and save data to the DB
std::string query = "INSERT INTO cpu_table (cores, model, siblings) VALUES ('" + cores + "', '" +
model + "', '" + siblings + "')";
if (::mysql_query(handle_, query.c_str()) != 0) {
std::cerr << "Error occurred while saving CPU information to the database: "
<< ::mysql_error(handle_) << std::endl;
}
}
}
void Database::SaveNicToDatabase(json const& nicData) {
for (auto const& nic : nicData) {
// Extract information for each NIC
std::string interface = nic["Interface"].get<std::string>();
std::string mac_address = nic["MAC Address"].get<std::string>();
std::string operational_state = nic["Operational State"].get<std::string>();
std::string speed = nic["Speed"].get<std::string>();
// Generate query and save data to the DB
std::string query =
"INSERT INTO nic_table (interface, mac_address, operational_state, speed) VALUES ('" + interface +
"', '" + mac_address + "', '" + operational_state + "', '" + speed + "')";
int queryResult = ::mysql_query(handle_, query.c_str());
if (queryResult != 0) {
std::cerr << "Error occurred while saving NIC information to the database: "
<< ::mysql_error(handle_) << std::endl;
}
}
}
void Database::SaveMemoryToDatabase(json const& /*memoryData*/) {
// Similar logic for saving memory information to the database
// ...
}
void Database::SaveDiskToDatabase(json const& /*diskData*/) {
// Similar logic for saving disk information to the database
// ...
}