对 boost asio 中的 async_compose 函数感到困惑

问题描述 投票:0回答:1

我试图学习一些关于基于boost asio编写自己的异步函数的知识,所以我尝试实现一个异步ntp查询函数:

#include <cstdint>
#include <iostream>
#include <memory>
#include <string>

#include <boost/asio/buffer.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/system/error_code.hpp>

namespace asio = boost::asio;
namespace sys  = boost::system;
using asio::ip::udp;

// Address of one NTP server.
static udp::endpoint ntp_server_ep =
    udp::endpoint(asio::ip::address_v4::from_string("203.107.6.88"), 123);

// Fixed NTP request message.
static std::array<uint8_t, 48> ntp_req_message{
    0x1B, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};

void hexdump(const void* data, std::size_t size)
{
    const uint8_t* pdata = (const uint8_t*) data;

    for (std::size_t n = 0; n < size; ++n) {
        std::printf("%02x ", pdata[n]);
    }
    std::printf("\n");
}

// #include <boost/asio/yield.hpp>
struct async_ntp_query_impl
{
    udp::socket& socket_;
    asio::coroutine coro_;
    udp::endpoint local_ep_;
    std::unique_ptr<uint8_t[]> buffer_{nullptr};

    template<typename Self>
    void operator()(Self& self,
                    sys::error_code err = sys::error_code(),
                    std::size_t n       = 0)
    {
        BOOST_ASIO_CORO_REENTER(coro_)
        {
            BOOST_ASIO_CORO_YIELD socket_.async_send_to(
                asio::buffer(ntp_req_message), ntp_server_ep, std::move(self));
            if (err) {
                std::printf("async_send_to() error: %s\n",
                            err.message().data());
                break;
            }

            BOOST_ASIO_CORO_YIELD socket_.async_receive_from(
                asio::buffer(buffer_.get(), 1024), local_ep_, std::move(self));
            if (err) {
                std::printf("async_receive_from() error: %s\n",
                            err.message().data());
            } else {
                std::printf("async_receive_from() ok: n=%ld\n", n);
                hexdump(buffer_.get(), n);
            }
            break;
        }

        self.complete(err);
    }
};
// #include <boost/asio/unyield.hpp>

template<typename CompletionToken>
auto async_ntp_query(udp::socket& socket, CompletionToken&& token)
{
    std::unique_ptr<uint8_t[]> buffer{new uint8_t[1024]};

    return asio::async_compose<CompletionToken, void(sys::error_code)>(
        async_ntp_query_impl{
            socket, asio::coroutine(), udp::endpoint(), std::move(buffer)},
        token,
        socket);
}

int main()
{
    asio::io_context ioctx;

    udp::socket socket(ioctx, udp::endpoint(udp::v4(), 0));
    async_ntp_query(socket, [](sys::error_code err) {
        if (err) {
            std::printf("async_ntp_query() error: %s\n", err.message().data());
        } else {
            // I don't known why this line print three times???
            std::printf("async_ntp_query() ok\n");
        }
    });

    ioctx.run();
    return 0;
}

不知道为什么回调被调用了3次,有什么问题吗?该演示代码取自asio的async_compose函数文档。我找不到与此功能相关的更多文档或示例。有人可以给我一些建议吗,谢谢。

c++ asynchronous udp boost-asio
1个回答
0
投票

您的

self.complete()
呼叫超出了重新进入限制范围。这会导致虚假调用。

接下来是 UB,因为您的实现对象会移动,但您保留对

local_ep
的引用。您当然可以使其成为自己的
unique_ptr
,但代价是更多的分配。我建议将它们结合起来:

住在Coliru

#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
#include <span>

namespace { // DEBUG
    using namespace std::chrono_literals;

    static std::mutex      console_mx;
    static std::atomic_int tid_gen = 0;
    thread_local int const tid     = [] { return ++tid_gen; }();
    auto constexpr now             = std::chrono::steady_clock::now;
    static auto const start        = now();

    static inline void trace(auto const&... msg) {
        std::lock_guard lk(console_mx);
        std::cerr << "at" << std::setw(7) << (now() - start) / 1ms << "ms, tid:" << tid << " ";
        (std::cerr << ... << msg) << std::endl;
    }

    static void hexdump(std::span<uint8_t const> data, size_t n) {
        std::ostringstream oss;
        oss << std::hex << std::setfill('0');
        for (int ch : data.subspan(0, n))
            oss << std::setw(2) << ch << ' ';
        trace(oss.str());
    }
} // namespace

namespace asio = boost::asio;
namespace sys  = boost::system;
using asio::ip::udp;

// Address of one NTP server.
static udp::endpoint ntp_server_ep =
    udp::endpoint(asio::ip::address_v4::from_string("203.107.6.88"), 123);

// Fixed NTP request message.
inline static std::array<uint8_t, 48> ntp_req_message{
    0x1B, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};

struct async_ntp_query_impl {
    udp::socket& socket_;
    struct State {
        asio::coroutine           coro_;
        udp::endpoint             local_ep_;
        std::array<uint8_t, 1024> buffer_;
    };
    std::unique_ptr<State> state_ = std::make_unique<State>();

    template <typename Self> void operator()(Self& self, sys::error_code err = {}, size_t n = 0) {
        auto& [coro, local_ep, buffer] = *state_;
        BOOST_ASIO_CORO_REENTER(coro) {
            BOOST_ASIO_CORO_YIELD socket_.async_send_to(asio::buffer(ntp_req_message), ntp_server_ep,
                                                        std::move(self));
            if (err) {
                trace("async_send_to() error: ", err.message());
                goto completion;
            }

            BOOST_ASIO_CORO_YIELD socket_.async_receive_from(asio::buffer(buffer), local_ep,
                                                             std::move(self));
            if (err) {
                trace("async_receive_from() error: ", err.message());
            } else {
                trace("async_receive_from() ok: n=", n);
                hexdump(buffer, n);
            }

completion:
            self.complete(err);
        }
    }
};

template<typename CompletionToken>
auto async_ntp_query(udp::socket& socket, CompletionToken&& token)
{
    auto buffer = std::make_unique<uint8_t[]>(1024);

    return asio::async_compose<CompletionToken, void(sys::error_code)>(
        async_ntp_query_impl{socket}, token, socket);
}

int main() {
    asio::io_context ioctx;

    udp::socket s{ioctx, udp::endpoint(udp::v4(), 0)};
    async_ntp_query(s, [](sys::error_code err) {
        if (err) {
            trace("async_ntp_query() completion error: ", err.message());
        } else {
            trace("async_ntp_query() completion ok");
        }
    });

    ioctx.run();
}

完成后,打印例如

at    232ms, tid:1 async_receive_from() ok: n=48
at    232ms, tid:1 1c 02 00 e7 00 00 02 f4 00 00 00 44 64 6b 19 72 e8 97 09 e3 b3 c5 a4 a9 00 00 00 00 00 00 00 00 e8 97 09 fe 5e ff fc 8c e8 97 09 fe 5f 00 de 1c 
at    232ms, tid:1 async_ntp_query() completion ok

请注意,您没有超时逻辑,并且 UDP 没有保证传送。因此,程序可能会无限期地等待。

© www.soinside.com 2019 - 2024. All rights reserved.