我如何使用从on_read处理程序分派的最终回调将响应异步地返回给调用者?

问题描述 投票:0回答:2

我需要为c ++客户端公开异步REST api,在内部使用boost :: beast发送REST请求/接收响应。

起点是http_client_async.cpp示例。

现在,客户端将使用此异步api传递回调函数,该函数需要在REST操作结束时从on_read()处理函数[http_client_async.cpp]中调用,并将完整的响应传递回调用者。

我该如何实现?

c++ boost boost-asio boost-beast
2个回答
0
投票

在呼出时参考this example

修改session构造函数以接受一个接受http状态整数和主体字符串的回调。

typedef std::function<void(unsigned int, const std::string&)> CALLBACK;

CALLBACK callback_;

explicit
session(net::io_context& ioc, CALLBACK& callback)
    : resolver_(net::make_strand(ioc))
    , stream_(net::make_strand(ioc))
    , _callback(callback)
{
}

修改session::on_read以调用回调。

void
on_read(
    beast::error_code ec,
    std::size_t bytes_transferred)
{

    if(ec)
    {
        _callback(0, "");
    }
    else
    {
        _callback(_res.result_int(), _res.body());
    }
}

0
投票

但是有什么方法可以通过asio的io_context调用_callback吗?我想以异步方式调用此回调,因为由用户提供的此回调可能会阻塞,从而也阻塞io_context的线程?类似于在io_context中调度其他处理程序(如on_read(),on_write()等)的方式吗?

是。您所追求的是async_result协议。其他答案(例如How can I get a future from boost::asio::post?)中也有一些示例。

这里是构建基块:

存储处理程序

在“会话”中(让我们将其重命名为http_request_op并将其隐藏在一些详细的名称空间中,您要记住完成处理程序。

不用担心,没有人必须提出这样的处理程序。我们将添加一个初始化功能 async_http_request,它将为您效劳。

最终用户可能会使用future或协程(yield_context)。当然,如果愿意,它们can可以提供普通的香草回调。

using Response = http::response<http::string_body>;

template <typename Handler>
class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
    // ...
    Response res_;
    Handler handler_;

    // ...
  public:

    template <typename Executor>
    explicit http_request_op(Executor ex, Handler handler)
        : resolver_(ex),
        stream_(ex),
        handler_(std::move(handler))
    { }

现在,在最后一步中,您将调用该handler_。为简单起见,我将fail助手设为成员函数,并将其命名为complete

void complete(beast::error_code ec, char const* what) {
    if (ec && what) {
        // TODO: A better idea would to make a custom `Response` type that
        // has room for "fail stage"
        res_.reason(what);
    }
    post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
            handler_(ec, std::move(res_));
        });
}

现在检查ec并使用过fail的所有位置,现在都以相同的complete调用ec。另外,在on_read中,我们添加无条件补全:

void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
    if (ec)
        return complete(ec, "read");
    stream_.socket().shutdown(tcp::socket::shutdown_both, ec);

    // unconditional complete here
    return complete(ec, "shutdown");
}

启动功能(async_http_request

template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
    using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
    using handler_type = typename result_type::completion_handler_type;
    handler_type handler(std::forward<Token>(token));
    result_type result(handler);

    std::make_shared<detail::http_request_op<handler_type> >
        (make_strand(ctx), std::move(handler))
            ->start(host, port, target, version);

    return result.get();
}

[您会看到这将创建一个异步结果,该结果将根据传递的令牌,http_request_op的踢打出“处理程序”,并返回异步结果。

返回的内容取决于传递的令牌。查看用法:

用法

我将展示最终用户可以选择使用此async_http_request启动功能的各种方式:

使用未来

auto future = async_http_request(ioc.get_executor(), host, port, target, version, net::use_future);
ioc.run();

std::cout << future.get() << "\n";

返回类型为std::future<Response>

[promise的创建和设置返回值/异常信息的设置由Asio神奇地处理。

使用协程/产量背景:

net::spawn(ioc, [&ioc,args](net::yield_context yield) {
    try {
        auto host   = args[0];
        auto port   = args[1];
        auto target = args[2];
        int version = args[3]=="1.0"? 10 : 11;

        Response res = async_http_request(
                ioc,
                host, port, target, version,
                yield);

        std::cout << res << std::endl;
    } catch (boost::system::system_error const& se) {
        // no way to get at response here
        std::cout << "There was an error: " << se.code().message() << std::endl;
    }
});

ioc.run();

返回类型在这里只是Response。请注意,如果报告了错误情况,则会引发异常。或者,传递一个error_code变量:

        beast::error_code ec;
        Response res = async_http_request(
                ioc,
                host, port, target, version,
                yield[ec]);

        std::cout << ec.message() << "\n" << res << std::endl;

仍然使用回调

/*void*/ async_http_request(ioc, host, port, target, version, 
    [](beast::error_code ec, Response const& res) {
        std::cout << ec.message() << "\b" << res << "\n";
    });

返回值最终只是void

完整演示代码

没有实时演示,因为没有在线编译器支持网络请求,并且它超出了编译限制(例如here

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <iostream>
#include <memory>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;

using Response = http::response<http::string_body>;

namespace detail {
    template <typename Handler>
    class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
        tcp::resolver resolver_;
        beast::tcp_stream stream_;
        beast::flat_buffer buffer_;
        http::request<http::empty_body> req_;
        Response res_;
        Handler handler_;

        template <typename F>
        auto bind(F ptmf) { return beast::bind_front_handler(ptmf, this->shared_from_this()); }

        void complete(beast::error_code ec, char const* what) {
            if (ec && what) {
                // TODO: A better idea would to make a custom `Response` type that
                // has room for "fail stage"
                res_.reason(what);
            }
            post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
                    handler_(ec, std::move(res_));
                });
        }
      public:
        template <typename Executor>
        explicit http_request_op(Executor ex, Handler handler)
          : resolver_(ex),
            stream_(ex),
            handler_(std::move(handler))
        { }

        void start(beast::string_view host, beast::string_view port, beast::string_view target, int version) {
            req_.version(version);
            req_.method(http::verb::get);
            req_.target(target);
            req_.set(http::field::host, host);
            req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
            resolver_.async_resolve(host.to_string(), port.to_string(), 
                bind_executor(stream_.get_executor(), bind(&http_request_op::on_resolve)));
        }

      private:
        void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
            if (ec)
                return complete(ec, "resolve");
            stream_.expires_after(std::chrono::seconds(30));
            stream_.async_connect(results, bind(&http_request_op::on_connect));
        }

        void on_connect(beast::error_code const& ec, tcp::endpoint const&) {
            if (ec)
                return complete(ec, "connect");
            stream_.expires_after(std::chrono::seconds(30));
            http::async_write(stream_, req_, bind(&http_request_op::on_write));
        }

        void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
            if (ec)
                return complete(ec, "read");
            stream_.socket().shutdown(tcp::socket::shutdown_both, ec);

            // unconditional complete here
            return complete(ec, "shutdown");
        }

        void on_write(beast::error_code ec, size_t /*bytes_transferred*/) {
            if (ec)
                return complete(ec, "write");
            http::async_read(stream_, buffer_, res_, bind(&http_request_op::on_read));
        }
    };
}

template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
    using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
    using handler_type = typename result_type::completion_handler_type;
    handler_type handler(std::forward<Token>(token));
    result_type result(handler);

    std::make_shared<detail::http_request_op<handler_type> >
        (make_strand(ctx), std::move(handler))
            ->start(host, port, target, version);

    return result.get();
}

int main(int argc, char** argv) {
    std::vector<beast::string_view> args{argv+1, argv+argc};
    if (args.size() == 3) args.push_back("1.1");

    if (args.size() != 4) {
        std::cerr << "Usage: http-client-async <host> <port> <target> [<HTTP "
                     "version: 1.0 or 1.1(default)>]\n"
                  << "Example:\n"
                  << "    http-client-async www.example.com 80 /\n"
                  << "    http-client-async www.example.com 80 / 1.0\n";
        return 255;
    }

    auto host   = args[0];
    auto port   = args[1];
    auto target = args[2];
    int version = args[3]=="1.0"? 10 : 11;

    net::io_context ioc;

    net::spawn(ioc, [=,&ioc](net::yield_context yield) {
        try {
            Response res = async_http_request(
                    ioc,
                    host, port, target, version,
                    yield);

            std::cout << "From coro (try/catch): " << res.reason() << std::endl;
        } catch (boost::system::system_error const& se) {
            // no way to get at response here
            std::cout << "coro exception: " << se.code().message() << std::endl;
        }
    });

    net::spawn(ioc, [=,&ioc](net::yield_context yield) {
        beast::error_code ec;
        Response res = async_http_request(
                ioc,
                host, port, target, version,
                yield[ec]);

        std::cout << "From coro: " << ec.message() << ", " << res.reason() << "\n";
    });

    /*void*/ async_http_request(ioc, host, port, target, version, 
        [](beast::error_code ec, Response const& res) {
            std::cout << "From callback: " << ec.message() << ", " << res.reason() << "\n";
        });

    auto future = async_http_request(ioc, host, port, target, version, net::use_future);

    ioc.run();
    try {
        std::cout << "From future: " << future.get().reason() << "\n";
    } catch (boost::system::system_error const& se) {
        std::cout << "future exception: " << se.code().message() << std::endl;
    }
}

成功和失败请求的输出:

$ ./sotest www.example.com 80 / 1.1
From callback: Success, OK
From coro: Success, OK
From coro (try/catch): OK
From future: OK

$ ./sotest www.example.com 81 / 1.1
From callback: The socket was closed due to a timeout, connect
coro exception: The socket was closed due to a timeout
From coro: The socket was closed due to a timeout, connect
From future: future exception: The socket was closed due to a timeout

$ ./sotest www.example.cough 80 / 1.1
From callback: Host not found (authoritative), resolve
coro exception: Host not found (authoritative)
From coro: Host not found (authoritative), resolve
From future: future exception: Host not found (authoritative)

$ ./sotest www.example.com rhubarb / 1.1
From callback: Service not found, resolve
coro exception: Service not found
From coro: Service not found, resolve
From future: future exception: Service not found

请注意,由于所有内容都是异步运行,因此超时示例当然总共要运行约30秒。

© www.soinside.com 2019 - 2024. All rights reserved.