Boost::Asio 异步客户端和服务器代理实现 - 跨实例访问客户端和服务器套接字

问题描述 投票:0回答:1

我正在致力于实现一个客户端和服务器代理系统,其中代理负责接收来自客户端的命令并将其转发到服务器,反之亦然。但是,我在从服务器实例访问客户端套接字以将消息转发到客户端时面临着挑战[反之亦然]。我目前正在学习 Boost.Asio 行为。您能否建议在服务器和客户端实例之间共享套接字的最有效方法?以下是当前的实现

在 cmd_handler::read_cmd_done API 中 - client_server_.is_open() 为 0 但不是 1。

#pragma once
#ifndef __OAMIP_PROXY_H__
#define __OAMIP_PROXY_H__

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <vector>
#include <functional>
#include <deque>
#include "config.h"

namespace asio = boost::asio;
class cmd_handler: public std::enable_shared_from_this<cmd_handler>
{
public:
    cmd_handler(asio::io_context &io_context, AppConfig* appConfig);
    ~cmd_handler();
    asio::ip::tcp::socket &socket();
    asio::ip::tcp::socket &c_socket();
    asio::ip::tcp::endpoint remote_endpoint();
    asio::ip::tcp::endpoint c_remote_endpoint();
    asio::io_context &m_io_context();
    void start();
    void read_cmd();
    void read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
    void read_cmd_client();
    void read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
private:
    asio::io_context& io_context_;
    asio::ip::tcp::socket server_socket_;
    asio::ip::tcp::socket client_socket_;
    asio::io_context::strand write_strand_;
    asio::streambuf in_packet_;
    std::deque<std::string> send_cmd_queue;
    std::mutex queue_mutex_;  // Added for thread safety
    AppConfig *app_config;
};


class ProxyServer
{
    using shared_handler_t = std::shared_ptr<cmd_handler>;
public:
    ProxyServer(int thread_count, AppConfig* appConfig);;
    ~ProxyServer();
    void start_server(std::string ip_addr, int port);
    void start_client(std::string ip_addr, int port);
    void handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec);
private:
    asio::io_context io_context_;
    int thread_count_;
    asio::ip::tcp::acceptor acceptor_;
    asio::ip::tcp::resolver resolver_;
    std::vector<std::thread> thread_pool_;
    asio::streambuf buffer_;
    AppConfig *app_config;
};

#endif //  __OAMIP_PROXY_H__

nagarajans1@PA168951:~/Projects/OpenAMIPProxy$ cat proxy.cpp
#include "proxy.h"
#include "parser.cpp"
#include "send.cpp"

ProxyServer::ProxyServer(int thread_count,
                         AppConfig *appconfig)
    : thread_count_(thread_count),
      acceptor_(io_context_),
      resolver_(io_context_),
      thread_pool_(),
      app_config(appconfig)
{
}
/*******************************************************************************************/
ProxyServer::~ProxyServer()
{
    // Stop and join the io_context to prevent memory leaks
    io_context_.stop();
    for (auto &thread : thread_pool_)
    {
        thread.join();
    }
}
/*******************************************************************************************/
void ProxyServer::start_server(std::string ip_addr, int port)
{
    std::cout << "Starting Server, IP:" << ip_addr << ", Port:" << port << std::endl;
    auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
    asio::ip::address_v4 ipv4_address = asio::ip::address_v4::from_string(ip_addr);
    asio::ip::tcp::endpoint endpoint(ipv4_address, port);
    acceptor_.open(endpoint.protocol());
    acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
    acceptor_.bind(endpoint);
    acceptor_.listen();
    std::cout << "Start listening" << std::endl;
    try
    {
        acceptor_.async_accept(handler->socket(), [=](auto ec)
                               {
                                    if(ec)
                                        std::cerr << "Error accepting connection from : " << ec.message() << std::endl;
                                    else
                                        handle_new_connection(handler, ec); });
    }
    catch (const std::exception &e)
    {
        std::cerr << "Exception caught: " << e.what() << std::endl;
    }
    io_context_.run();
        // start pool of threads to process the asio events
 /*        for (int i = 0; i < thread_count_; ++i)
    {
        thread_pool_.emplace_back([=]
                                  { io_context_.run(); });
    }
    for (auto &thread : thread_pool_)
    {
        thread.join();
    } */
}
void ProxyServer::start_client(std::string ip_addr, int port)
{
    std::cout << "Starting client" << std::endl;
    auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
    asio::ip::tcp::resolver::query query(ip_addr, std::to_string(port));
    asio::ip::tcp::resolver::iterator endpoint_iterator = resolver_.resolve(query);
    asio::connect(handler->c_socket(), endpoint_iterator);

    std::cout << "Connected to server on port " << port << std::endl;

    handler->read_cmd_client();
    io_context_.run();
}

/*******************************************************************************************/
void ProxyServer::handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec)
{
    std::cout << "Handle connection" << std::endl;
    if (ec)
    {
        std::cerr << "Error accepting connection from client: " << ec.message() << std::endl;
        return;
    }
    handler->read_cmd();
    auto new_handler = std::make_shared<cmd_handler>(io_context_, app_config);
    acceptor_.async_accept(new_handler->socket(), [=](auto ec)
                           { handle_new_connection(new_handler, ec); });
}
/*******************************************************************************************/
cmd_handler::cmd_handler(asio::io_context &io_context, AppConfig *appConfig)
    : io_context_(io_context), server_socket_(io_context), client_socket_(io_context),write_strand_(io_context), app_config(appConfig)
{

}
/*******************************************************************************************/
cmd_handler::~cmd_handler()
{
    // Explicitly clear the buffer to release the allocated memory
    in_packet_.consume(in_packet_.size());
}
/*******************************************************************************************/
asio::ip::tcp::socket &cmd_handler::socket()
{
    return server_socket_;
}
asio::ip::tcp::socket &cmd_handler::c_socket()
{
    return client_socket_;
}
asio::ip::tcp::endpoint cmd_handler::remote_endpoint()
{
    return server_socket_.remote_endpoint();
}
asio::ip::tcp::endpoint cmd_handler::c_remote_endpoint()
{
    return client_socket_.remote_endpoint();
}
asio::io_context &cmd_handler::m_io_context()
{
    return io_context_;
}
/*******************************************************************************************/
void cmd_handler::start()
{
    read_cmd();
}
/*******************************************************************************************/
void cmd_handler::read_cmd()
{
    auto remote_ip = remote_endpoint().address().to_string();
    std::cout << "Read command from " << remote_ip << std::endl;
    asio::async_read_until(server_socket_,
                           in_packet_,
                           '\n',
                           [me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
                           {
                               if (ec == asio::error::eof)
                               {
                                   std::cout << "Connection closed by client:" << std::endl;
                                   return; // No need to read further; the connection is closed.
                               }
                               else if (ec)
                               {
                                   std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
                                   return;
                               }
                               else
                               {
                                   me->read_cmd_done(ec, bytes_xfer);
                               }
                           });
}
void cmd_handler::read_cmd_client()
{
    std::cout << "Read cmd client" << std::endl;
    auto remote_ip = c_remote_endpoint().address().to_string();
    std::cout << client_socket_.is_open() << std::endl;
    std::cout << "Read command from " << remote_ip << std::endl;
    asio::async_read_until(client_socket_,
                           in_packet_,
                           '\n',
                           [me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
                           {
                               if (ec == asio::error::eof)
                               {
                                   std::cout << "Connection closed by client:" << std::endl;
                                   return; // No need to read further; the connection is closed.
                               }
                               else if (ec)
                               {
                                   std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
                                   return;
                               }
                               else
                               {
                                   me->read_cmd_client_done(ec, bytes_xfer);
                               }
                           });
}
void cmd_handler::read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
    auto remote_ip = remote_endpoint().address().to_string();
    if (ec == asio::error::eof)
    {
        std::cout << "Connection closed by client:" << remote_ip << std::endl;
        return; // No need to read further; the connection is closed.
    }
    else if (ec)
    {
        std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
        return;
    }

    std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
    in_packet_.consume(bytes_transferred);
    std::cout << "Connected server IP: " << remote_ip << std::endl;
    std::cout << "command:" << command << std::endl;
    Parser parser(app_config);
    std::string recv_cmd = parser.process_cmd(command);
    //CmdSender sender(server_socket_);
    //sender.send_cmd(recv_cmd);
    read_cmd();
};
/*******************************************************************************************/
void cmd_handler::read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
    auto remote_ip = remote_endpoint().address().to_string();
    if (ec == asio::error::eof)
    {
        std::cout << "Connection closed by client:" << remote_ip << std::endl;
        return; // No need to read further; the connection is closed.
    }
    else if (ec)
    {
        std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
        return;
    }

    std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
    in_packet_.consume(bytes_transferred);
    std::cout << "Connected client IP: " << remote_ip << std::endl;
    std::cout << "command:" << command << std::endl;
    Parser parser(app_config);
    std::string recv_cmd = parser.process_cmd(command);
    std::cout << client_socket_.is_open() <<c_socket().is_open() << socket().is_open()<< std::endl;
    CmdSender sender(client_socket_);
    sender.send_cmd(recv_cmd);
    read_cmd();
};
/*******************************************************************************************/
int main() {
  try
    {
    
        int thread_count = 1;
        int port = appConfig.serverConfig.port;
        int s_port = appConfig.clientConfig.port;
        std::vector<std::thread> thread_pool;
        std::string ip_addr = appConfig.serverConfig.ip;
   

        // start pool of threads to process the asio events
        for (int i = 0; i < thread_count; ++i)
        {
            thread_pool.emplace_back([&]()
                                     {  proxy_server.start_server(ip_addr, port); });
        }
        for (int i = 0; i < thread_count; ++i)
        {
                thread_pool.emplace_back([&]()
                                         { proxy_server.start_client(ip_addr, s_port);});
        }
        for (auto &thread : thread_pool)
        {
            thread.join();
        }
    }
    catch (const std::exception &e)
    {
        std::cerr << "Exception caught: " << e.what() << std::endl;
    }

    return 0;
}
c++ sockets boost-asio shared-ptr private-members
1个回答
0
投票
std::mutex queue_mutex_; // Added for thread safety

互斥锁除非使用,否则不会执行任何操作。

asio::io_context::strand write_strand_;

链同步对共享资源的访问。可能您只需要链,但不是为了“写入”,而是为了...资源(异步操作中使用的 IO 对象和缓冲区)。

您可能正在多个线程的同一端点上运行许多“服务器”和“客户端”。我不认为你想要那样。您可能只想让“服务器”接受多个连接。您可以通过在上一个连接之后接受更多连接来做到这一点。您已经在

handle_new_connection
结束时这样做了:

acceptor_.async_accept(new_handler->socket(),
                       [=](auto ec) { handle_new_connection(new_handler, ec); });

其他一些注意事项:

    // Explicitly clear the buffer to release the allocated memory
    in_packet_.consume(in_packet_.size());

这不是必需的。析构函数已经释放了所拥有的资源。那里不需要做任何事情。


会话管理

现在从逻辑上讲,每个接受的客户端连接都有自己的到代理服务器的连接。无需协调。只需将上游连接移至“CmdHandler”(将其重命名为 ProxySession 或其他名称),问题就会自行解决。

我本来打算编辑一些东西,但这会花费……超过合理的时间。相反,请考虑查看我最近评论的代理实现:Tcp proxy mysql。从 mysql-client 接收的数据以奇怪的符号输出它显示了基本上完全相同的想法。

© www.soinside.com 2019 - 2024. All rights reserved.