在我的代理代码中,当从客户端(代理充当服务器)收到断开连接信号时,它无法正确终止服务器端(代理充当客户端)的关联会话。当客户端重新连接时,它通过充当客户端的代理与服务器建立新的连接。但是,如果客户端由于软件错误而反复重新启动,则考虑到代理不断作为客户端发起新连接,则可能会耗尽服务器的最大文件描述符。我实现了一个名为 close_client_connection 的函数,该函数旨在当从客户端检测到套接字断开连接事件时终止与服务器的连接。这是正确的方法吗?或者您有其他建议吗?
#include <boost/asio.hpp>
using error_code = boost::system::error_code;
namespace asio = boost::asio;
using socket_t = std::shared_ptr<asio::ip::tcp::socket>;
...
....
void bridge::start_server()
{
try
{
socket_t socket = std::make_shared<boost::asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](auto ec)
{
if(ec)
log_error("Error accepting connection from : %s",ec.message().c_str());
else
this->handle_accept(ec, socket); });
}
catch (const std::exception &e)
{
log_error("Start_server, Exception caught: %s", e.what());
}
}
void bridge::handle_accept(error_code const &error, socket_t upstream_socket)
{
if (!error)
{
socket_t downstream_socket = std::make_shared<boost::asio::ip::tcp::socket>(io_context_);
auto handler = std::make_shared<msg_handler>(io_context_, upstream_socket, downstream_socket, app_config);
handler->start_async_connect();
}
else
{
log_error("Error! Error code = %d \t Message : %s", error.value(), error.message().c_str());
}
if (!_stopped)
{
log_error("Connection closed by the client");
start_server();
}
else
{
acceptor_.close();
}
}
...
...
void msg_handler::close_client_connection(){
if (client_socket_->is_open())
client_socket_->cancel();
client_socket_->close();
}
void msg_handler::stop()
{
if (server_socket_->is_open())
server_socket_->cancel(); // closing would be a race condition
if (client_socket_->is_open())
client_socket_->cancel();
};
void msg_handler::start_async_connect()
{
auto upstream_host = app_config->clientConfig.ip;
auto upstream_port = app_config->clientConfig.port;
asio::ip::tcp::resolver::query query(upstream_host, std::to_string(upstream_port), asio::ip::resolver_query_base::numeric_service);
asio::ip::tcp::resolver::iterator client_endpoint_iterator_ = resolver_.resolve(query);
bool connected = false;
int server_connect_retry_timeout = app_config->clientConfig.timeout;
asio::async_connect(*client_socket_, client_endpoint_iterator_,
[me = shared_from_this(), server_connect_retry_timeout](const boost::system::error_code &ec, const asio::ip::tcp::resolver::iterator)
{
if (!ec)
{
me->handle_server_connection(ec);
}
else
{
log_error("Async connect failed for server %s, Error:%s", me->get_server().c_str(), ec.message().c_str());
me->stop();
log_info("Retrying in %d seconds", server_connect_retry_timeout);
std::this_thread::sleep_for(std::chrono::seconds(server_connect_retry_timeout));
me->start_async_connect();
}
});
}
void msg_handler::handle_server_connection(error_code const &error)
{
if (!error)
{
read_cmd_from_client();
read_cmd_from_server();
}
else
{
log_error("Connect Error! Error code = %d . Message : %s", error.value(), error.message().c_str());
stop();
}
}
void msg_handler::read_cmd_from_client()
{
boost::system::error_code error_code;
asio::ip::tcp::endpoint endpoint = remote_endpoint(error_code);
if (!error_code)
{
auto remote_ip = endpoint.address().to_string();
auto port = endpoint.port();
}
else
{
if (server_socket_ && client_socket_)
log_error("Error in remote endpoint client: %s", error_code.message().c_str());
return;
}
asio::async_read_until(*server_socket_,
in_logket_,
'\n',
[me = shared_from_this(), key](boost::system::error_code const &ec, std::size_t bytes_xfer)
{
if (ec == asio::error::eof)
{
log_error("Connection closed by client %s", key.c_str());
me->close_client_connection();
return; // No need to read further; the connection is closed.
}
else if (ec)
{
log_error("Error in async_read_until: %s", ec.message().c_str());
return;
}
else
{
me->read_cmd_from_client_done(ec, bytes_xfer);
}
});
}
void msg_handler::read_cmd_from_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
log_debug("Reading socket fd: %d,Sending socket fd:%d", server_socket_->is_open(), client_socket_->is_open());
if (ec == asio::error::eof)
{
log_error("Connection closed by client");
close_client_connection();
return; // No need to read further; the connection is closed.
}
else if (ec)
{
log_error("Error accepting logket from the client: %s", ec.message().c_str());
return;
}
std::string command(buffers_begin(in_logket_.data()), buffers_begin(in_logket_.data()) + bytes_transferred);
in_logket_.consume(bytes_transferred);
log_info("From Client: <--- %s", command.c_str());
CmdSender sender(client_socket_);
log_info("To Server: ---> %s", recv_cmd.c_str());
sender.send_cmd(recv_cmd);
}
read_cmd_from_client();
}
...
...
....
抱歉,代码不完整。现在我的分析很可能是
is_open()
调用是一种竞争条件,而调用 cancel()
是一种 数据竞争。甚至没有足够的代码来查看哪个套接字与您的 upstream_socket
中的 msg_handler
相匹配。在“start_async_connect”中执行阻止 DNS 解析是......一个谎言并要求稳定性问题。我不明白为什么套接字是指针,如果整个 msg_handler 是共享的_from_this ...
在寻找处理客户端 EOF/断开连接的位置时,这些正是引起我注意的事情。
发现后,
为什么这个逻辑是重复的?它在 lambda 中,但在
read_cmd_from_client_done
中重复
你有明确的
if (ec == asio::error::eof)
{
log_error("Connection closed by client");
close_client_connection();
return; // No need to read further; the connection is closed.
}
当您添加类似
close_server_connection()
之类的内容时,您的错误似乎已修复?我的意思是,这就是缺失的行为,对吗?
此完成处理程序当前丢弃部分成功读取的数据(错误== eof)。这……可能也是一个错误。