我的服务器中 boost::asio async_connect 的问题是什么?

问题描述 投票:0回答:1
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/bind.hpp>
#include "http_server.hh"

using namespace boost::asio;
using ip::tcp;

std::string desired_IP_address = "172.16.0.2"; // For example purposes

class Session : public std::enable_shared_from_this<Session> {
public:
    Session(ip::tcp::resolver& resolver,tcp::socket socket, tcp::socket client_socket) : socket_(std::move(socket)), resolver_(resolver), client_socket_(std::move(client_socket)) {}

    void start() { 
        do_read();
    }

private:
    void do_read() {
        auto self(shared_from_this());
        socket_.async_read_some(
            boost::asio::buffer(data_),
            [this, self](boost::system::error_code ec, std::size_t length) {
                if (!ec) {
                    std::string request(data_.data(), length);
                    path_ = extract_path(request);
                    resolver_.async_resolve(
                        ip::tcp::resolver::query("172.16.0.4", "5000"),
                        [this,self](const boost::system::error_code& ec,
                            ip::tcp::resolver::iterator it) {
                            if (ec) {
                                std::cout  << "Error resolving " << "localhost" << ": "
                                        << ec.message()<< std::endl;
                                return;
                            }

                            // For simplicity, we'll assume the first endpoint will always
                            // be available.
                            //std::cout << "localhost" << ": resolved to " << it->endpoint()
                            //        << std::endl;
                            do_connect(it->endpoint());
                        });
                    //handle_request_async(path);
                }
            });
    }

    void do_connect(const ip::tcp::endpoint& dest) {
        // Remember that the Asio library will make copies of parameters passed
        // by const reference, so it's ok to let the endpoint go out of scope
        // when this method returns.
        auto self(shared_from_this());
        boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string("172.16.0.4"), 5000);
        client_socket_.async_connect(a
            endpoint, [this, self](const boost::system::error_code& ec) {
                if (ec) {
                    std::cout << "Error connecting to " << "localhost"<< ": "
                               << ec.message()<< std::endl;
                    return;
                }

                //std::cout << "localhost" << ": connected to "
                //          << client_socket_.remote_endpoint() << std::endl;
                do_send_http_get();
            });
    }

    void do_send_http_get() {
        auto self(shared_from_this());
        // At minimum, the remote server needs to know the path being fetched
        // and the host serving that path. The latter is required because a
        // single server often hosts multiple domains.
        request_ = std::string("GET /") + " HTTP/1.1\r\nHost: " + "example.com" + "\r\n\r\n";
        async_write(
            client_socket_, buffer(request_),
            [this,self](const boost::system::error_code& ec, std::size_t size) {
                if (ec) {
                    std::cout << "Error sending GET " << ec<< std::endl;
                    return;
                }

                //std::cout << "localhost" << ": sent " << size << " bytes"<< std::endl;
                do_recv_http_get_header();
            });
    }

    void do_recv_http_get_header() {
        // Since HTTP/1.1 is a text based protocol, most of it is human readable
        // by design. Notice how the "double end of line" character sequence
        // ("\r\n\r\n") is used to delimit message sections.
        auto self(shared_from_this());
        async_read_until(
            client_socket_, response_, "\r\n\r\n",
            [this, self](const boost::system::error_code& ec, std::size_t size) {
                if (ec) {
                    std::cout << "Error receiving GET header " << ec;
                    return;
                }

                //std::cout << "localhost:5000" << ": received " << size << ", streambuf "
                //          << response_.size();

                // The asio::streambuf class can use multiple buffers
                // internally, so we need to use a special iterator to copy out
                // the header.
                std::string header(
                    buffers_begin(response_.data()),
                    buffers_begin(response_.data()) + size);
                response_.consume(size);

                //std::cout << "----------" << std::endl << "localhost:5000"
                //          << ": header length " << header.size() << std::endl
                //          << header << std::endl;

                // First we'll check for the explicit "Content-Length" length
                // field. This provides the exact body length in bytes.
                size_t pos = header.find("Content-Length: ");
                if (pos != std::string::npos) {
                    size_t len = std::strtoul(
                        header.c_str() + pos + sizeof("Content-Length: ") - 1,
                        nullptr, 10);
                    do_receive_http_get_body(len - response_.size());
                    return;
                }

                // The other alternative is a chunked transfer. There is a quick
                // way to determine the remaining length in this case.
                pos = header.find("Transfer-Encoding: chunked");
                if (pos != std::string::npos) {
                    do_receive_http_get_chunked_body();
                    return;
                }

                std::cout << "Unknown body length";
            });
    }

    void do_receive_http_get_body(size_t len) {
        // For "Content-Length" we know exactly how many bytes are left to
        // receive.
        auto self(shared_from_this());
        async_read(
            client_socket_, response_, transfer_exactly(len),
            [this,self] (const boost::system::error_code& ec, std::size_t size) {
               handle_http_get_body(ec, size);
            });
    }

    void do_receive_http_get_chunked_body() {
        // For chunked transfers the final body chunk will be terminated by
        // another "double end of line" delimiter.
        auto self(shared_from_this());
        async_read_until(
            client_socket_, response_, "\r\n\r\n",
            [this,self] (const boost::system::error_code& ec, std::size_t size) {
                handle_http_get_body(ec, size);
            });
    }

    void handle_http_get_body(const boost::system::error_code& ec,
                              std::size_t size) {
        if (ec) {
            std::cout << "Error receiving GET body " << ec;
            return;
        }

        //std::cout << "localhost:5000" << ": received " << size << ", streambuf "
        //          << response_.size();

        // We can finally consume the body and print it out if desired.
        const auto& data = response_.data();
        std::string response_body(buffers_begin(data), buffers_end(data));
        response_.consume(size);

        //std::cout << "----------" << std::endl << "localhost:5000" << ": body length "
        //          << response_body.size() << std::endl;
        //std::cout << response_body << std::endl;
        handle_request_async();
    }


    void handle_request_async() {
        auto self(shared_from_this());

        async_response([this,self](const std::string& response) {
            // std::cout << "path " <<  path_ << std::endl;
            // std::cout << response << std::endl;
            async_write(socket_, boost::asio::buffer(response),
                [this,self](boost::system::error_code ec, std::size_t /*length*/) {
                    if (!ec) {
                        //std::cout << "response" << std::endl;
                        boost::system::error_code ignored_ec;
                        socket_.shutdown(tcp::socket::shutdown_both, ignored_ec);
                    }
                });
        });
    }

    void async_response(std::function<void(const std::string&)> callback) {
        // Assuming handle_request returns HTTP_Response asynchronously
        HTTP_Response* htmlResponse = handle_request(path_);
        //std::cout << "path: " << path << std::endl;
        std::string response = htmlResponse->body;
        //std::cout << "content: " << response << std::endl;
        callback(response);
        
        delete htmlResponse;
    }

    // Function to extract path from the HTTP request
    std::string extract_path(const std::string& request) {
        // Logic to extract path from the request string
        // Example logic: extracting the path after the GET method
        std::string path;
        //std::cout << request << std::endl;
        // Implement your path extraction logic here

        // Find the end of the request line (the first line of the HTTP request)
        std::size_t requestLineEnd = request.find("\r\n");
        if (requestLineEnd != std::string::npos) {
            std::string requestLine = request.substr(0, requestLineEnd);

            // Split the request line into parts (method, path, protocol)
            std::vector<std::string> parts;
            boost::split(parts, requestLine, boost::is_any_of(" "));

            // The second part typically contains the path (e.g., "GET /path HTTP/1.1")
            if (parts.size() >= 2) {
                path = parts[1]; // Extract the path from the request line
            }
        }

        return path;
    }

    tcp::socket socket_;
    tcp::socket client_socket_;
    std::array<char, 8192> data_;
    std::string path_;

    std::string request_;
    boost::asio::streambuf response_;
    ip::tcp::resolver& resolver_;

};

class Server {
public:
    Server(boost::asio::io_context& io_context, short port)
        : acceptor_(io_context, tcp::endpoint(boost::asio::ip::make_address(desired_IP_address), port)),
          //acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
          socket_(io_context), resolver_(io_context), client_socket_(io_context)
    {
        do_accept();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            socket_,
            [this] (boost::system::error_code ec) {
                if (!ec) {
                    std::make_shared<Session>(resolver_, std::move(socket_), std::move(client_socket_))->start();
                }
                do_accept();
            });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;
    tcp::socket client_socket_;
    ip::tcp::resolver resolver_;
};

int main() {
    try {
        boost::asio::io_context io_context;
        Server server(io_context, 8080);
        boost::asio::thread_pool pool(40);
        for (std::size_t i = 0; i < 40; ++i)
            boost::asio::post(pool, [&io_context]() { io_context.run(); });

        pool.join();
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }

    return 0;
}

我收到错误 “连接到本地主机时出错:无法分配请求的地址” 这是 async_connect 调用时的错误。 当服务器固定到一个核心且该核心的 cpu_utilization 为 100% 时,我会收到此错误。还观察到吞吐量周期性下降。

注意:此服务器与其尝试连接的服务器位于不同的 Linux 网络命名空间中

还有一件事此问题仅在存在网络间命名空间通信时才会发生,而在同一 Linux 网络命名空间或本地主机上运行服务器时不会发生。

c++ boost boost-asio
1个回答
0
投票

io_context.run()
已阻塞。

您将其发布到同样多的服务线程 40 次,具有讽刺意味的是,确保零线程能够取得进一步的进展。

看来您将

asio::threadpool
误认为是
boost::thread_group
之类的东西。相比之下,
asio::threadpool
已经is-aexecution_context,并且根据定义,所有线程都已经附加到池
run
服务。

总而言之,更简单的 main 看起来像:

int main() try {
    asio::thread_pool pool;
    Server            server(pool.get_executor(), 8080);
    pool.join();
} catch (std::exception const& e) {
    std::cerr << "Exception: " << e.what() << std::endl;
}

恐怕代码的其余部分有一些危险信号:

  • async_response 需要
    std::function
    ,这有击败 Asio 执行者的风险
  • 你有很多线程。这本身就是一种反模式,但特别是因为没有任何操作使用链
  • 您正在运行定制的 HTTP 消息“解析”。更喜欢使用库(例如 Boost Beast)。具体来说,您的代码看起来不会处理 HTTP 1.1 功能
  • 每个接受动作都来自同一个
    client_socket_
    。那是...充其量没用
  • 确实没有任何理由“假设第一个端点有效”。只需使用
    asio::async_connect

当推送到了紧要关头时,我不清楚您是否只是简单地实现一个中继 HTTP 代理(它看起来很像)还是......一个通用的 HTTP 服务器。现在看起来很像您收到了一个请求,您将其转发到上游。但是,根据给定的代码,来自上游服务器的响应...被视为请求?!?然后将其输入

handle_request
。看起来那段代码放错了地方。

无论如何,这是我长时间研究这段代码并提出大量您可能会使用的改进的结果。我用

 替换了丢失的标题 
http_server.hh

using Request  = http::request<http::string_body>;
using Response = http::response<http::string_body>;

http::message_generator handle_request(fs::path path) {
    std::cout << "path " << path << std::endl;

    path = fs::current_path() / path; // emulate virtual root, NOTE: not secure!

    if (exists(path) && fs::is_regular_file(path)) {
        http::response<http::file_body> res(http::status::ok, 11);

        error_code ec;
        res.body().open(path.c_str(), boost::beast::file_mode::scan, ec);
        std::cout << res.base() << std::endl;
        return res;
    } else {
        http::response<http::string_body> res(http::status::ok, 11);
        res.body() = "TODO:" + path.native();
        std::cout << res << std::endl;
        return res;
    }
}

这将通过发送该文件来响应与服务器工作目录中现有文件相对应的路径,否则带有请求路径的

TODO:
正文。

住在Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <filesystem>
#include <iostream>

namespace asio = boost::asio;
namespace http = boost::beast::http;
namespace fs   = std::filesystem;
using asio::ip::tcp;
using boost::system::error_code;

// #include "http_server.hh"
using Request  = http::request<http::string_body>;
using Response = http::response<http::string_body>;

http::message_generator handle_request(fs::path path) {
    std::cout << "path " << path << std::endl;

    path = fs::current_path() / path; // emulate virtual root, NOTE: not secure!

    if (exists(path) && fs::is_regular_file(path)) {
        http::response<http::file_body> res(http::status::ok, 11);

        error_code ec;
        res.body().open(path.c_str(), boost::beast::file_mode::scan, ec);
        std::cout << res.base() << std::endl;
        return res;
    } else {
        http::response<http::string_body> res(http::status::ok, 11);
        res.body() = "TODO:" + path.native();
        std::cout << res << std::endl;
        return res;
    }
}

struct Session : std::enable_shared_from_this<Session> {
    Session(tcp::socket socket)
        : downstream_{std::move(socket)}
        , upstream_{downstream_.get_executor()}
        , resolver_{downstream_.get_executor()} {}

    ~Session() { std::cerr << "Session closed" << std::endl; }

    void start() {
        std::cerr << "Accepted from " << downstream_.remote_endpoint() << std::endl;
        do_read();
    }

  private:
    void do_read() {
        http::async_read(
            downstream_, downbuf_, request_,
            [this, self = shared_from_this()](error_code ec, size_t /*length*/) {
                std::cout << "Received " << ec.message() << ": " << request_.base() << std::endl;
                if (ec)
                    return;
                path_     = request_.target();
                auto host = request_.at(http::field::host);
                auto port = "http"; // TODO parse port from host?

                resolver_.async_resolve(host, port, [this, self](error_code ec, auto eps) {
                    std::cout << "Resolved upstream: " << ec.message() << std::endl;
                    if (ec)
                        return;

                    for (auto&& ep : eps)
                        std::cout << " - candidate: " << ep.endpoint() << std::endl;

                    asio::async_connect(upstream_, eps, [this, self](error_code ec, tcp::endpoint) {
                        std::cout << "Connect upstream: " << ec.message() << std::endl;
                        if (!ec) {
                            std::cout << "upstream: " << upstream_.remote_endpoint() << std::endl;
                            do_relay();
                        }
                    });
                });
            });
    }

    void do_relay() {
#if true
        // keep the request as received from downstream_
#else
        // At minimum, server needs the path & host header
        request_ = Request{http::verb::get, "/", 11};
        request_.set(http::field::host, "example.com");
#endif

        async_write(upstream_, request_, [this, self = shared_from_this()](error_code ec, size_t size) {
            std::cout << "Relayed upstream: " << ec.message() << std::endl;
            if (!ec) {
                std::cout << "sent to upstream: " << size << " bytes" << std::endl;
                do_recv_response();
            }
        });
    }

    void do_recv_response() {
        response_.clear();
        http::async_read(
            upstream_, upbuf_, response_, [this, self = shared_from_this()](error_code ec, size_t /*size*/) {
                if (ec) {
                    std::cout << "Error receiving GET header " << ec.message();
                    return;
                }

                std::cout << "-- Response headers -- " << response_.base() << std::endl;

                if (response_.has_content_length()) {
                    std::cout << "-- Content-Length header = " << response_.at(http::field::content_length)
                              << std::endl;
                }

                if (response_.chunked())
                    std::cout << "-- Received using chunked encoding" << std::endl;

                std::cout << "-- Actual Body Length: " << response_.body().length() << std::endl;
                std::cout << "-- Body:               " << response_.body() << std::endl;

                // this looks REALY out of place
                boost::beast::async_write( //
                    downstream_, handle_request(path_),
                    [this, self = shared_from_this()](error_code ec, size_t /*length*/) {
                        if (!ec) {
                            // std::cout << "response" << std::endl;
                            error_code ignored_ec;
                            downstream_.shutdown(tcp::socket::shutdown_both, ignored_ec);
                        }
                    });
            });
    }

    tcp::socket     downstream_;
    asio::streambuf downbuf_;
    tcp::socket     upstream_;
    asio::streambuf upbuf_;

    Request         request_;
    Response        response_;
    std::string     path_;

    tcp::resolver   resolver_;
};

struct Server {
    Server(asio::any_io_executor ex, uint16_t port, std::string bind_address = "172.16.0.2")
        : acceptor_(ex, tcp::endpoint(asio::ip::make_address(bind_address), port)) {
        do_accept();
    }

  private:
    void do_accept() {
        acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket s) {
            if (!ec)
                std::make_shared<Session>(std::move(s))->start();

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
};

int main() try {
    asio::thread_pool pool;
    Server            server(pool.get_executor(), 8080, "0.0.0.0");
    pool.join();
} catch (std::exception const& e) {
    std::cerr << "Exception: " << e.what() << std::endl;
}

带有本地互动演示:

© www.soinside.com 2019 - 2024. All rights reserved.