我试图学习一些关于基于boost asio编写自己的异步函数的知识,所以我尝试实现一个异步ntp查询函数:
#include <cstdint>
#include <iostream>
#include <memory>
#include <string>
#include <boost/asio/buffer.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/system/error_code.hpp>
namespace asio = boost::asio;
namespace sys = boost::system;
using asio::ip::udp;
// Address of one NTP server.
static udp::endpoint ntp_server_ep =
udp::endpoint(asio::ip::address_v4::from_string("203.107.6.88"), 123);
// Fixed NTP request message.
static std::array<uint8_t, 48> ntp_req_message{
0x1B, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
void hexdump(const void* data, std::size_t size)
{
const uint8_t* pdata = (const uint8_t*) data;
for (std::size_t n = 0; n < size; ++n) {
std::printf("%02x ", pdata[n]);
}
std::printf("\n");
}
// #include <boost/asio/yield.hpp>
struct async_ntp_query_impl
{
udp::socket& socket_;
asio::coroutine coro_;
udp::endpoint local_ep_;
std::unique_ptr<uint8_t[]> buffer_{nullptr};
template<typename Self>
void operator()(Self& self,
sys::error_code err = sys::error_code(),
std::size_t n = 0)
{
BOOST_ASIO_CORO_REENTER(coro_)
{
BOOST_ASIO_CORO_YIELD socket_.async_send_to(
asio::buffer(ntp_req_message), ntp_server_ep, std::move(self));
if (err) {
std::printf("async_send_to() error: %s\n",
err.message().data());
break;
}
BOOST_ASIO_CORO_YIELD socket_.async_receive_from(
asio::buffer(buffer_.get(), 1024), local_ep_, std::move(self));
if (err) {
std::printf("async_receive_from() error: %s\n",
err.message().data());
} else {
std::printf("async_receive_from() ok: n=%ld\n", n);
hexdump(buffer_.get(), n);
}
break;
}
self.complete(err);
}
};
// #include <boost/asio/unyield.hpp>
template<typename CompletionToken>
auto async_ntp_query(udp::socket& socket, CompletionToken&& token)
{
std::unique_ptr<uint8_t[]> buffer{new uint8_t[1024]};
return asio::async_compose<CompletionToken, void(sys::error_code)>(
async_ntp_query_impl{
socket, asio::coroutine(), udp::endpoint(), std::move(buffer)},
token,
socket);
}
int main()
{
asio::io_context ioctx;
udp::socket socket(ioctx, udp::endpoint(udp::v4(), 0));
async_ntp_query(socket, [](sys::error_code err) {
if (err) {
std::printf("async_ntp_query() error: %s\n", err.message().data());
} else {
// I don't known why this line print three times???
std::printf("async_ntp_query() ok\n");
}
});
ioctx.run();
return 0;
}
不知道为什么回调被调用了3次,有什么问题吗?该演示代码取自asio的async_compose函数文档。我找不到与此功能相关的更多文档或示例。有人可以给我一些建议吗,谢谢。
您的
self.complete()
呼叫超出了重新进入限制范围。这会导致虚假调用。
接下来是 UB,因为您的实现对象会移动,但您保留对
local_ep
的引用。您当然可以使其成为自己的unique_ptr
,但代价是更多的分配。我建议将它们结合起来:
#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
#include <span>
namespace { // DEBUG
using namespace std::chrono_literals;
static std::mutex console_mx;
static std::atomic_int tid_gen = 0;
thread_local int const tid = [] { return ++tid_gen; }();
auto constexpr now = std::chrono::steady_clock::now;
static auto const start = now();
static inline void trace(auto const&... msg) {
std::lock_guard lk(console_mx);
std::cerr << "at" << std::setw(7) << (now() - start) / 1ms << "ms, tid:" << tid << " ";
(std::cerr << ... << msg) << std::endl;
}
static void hexdump(std::span<uint8_t const> data, size_t n) {
std::ostringstream oss;
oss << std::hex << std::setfill('0');
for (int ch : data.subspan(0, n))
oss << std::setw(2) << ch << ' ';
trace(oss.str());
}
} // namespace
namespace asio = boost::asio;
namespace sys = boost::system;
using asio::ip::udp;
// Address of one NTP server.
static udp::endpoint ntp_server_ep =
udp::endpoint(asio::ip::address_v4::from_string("203.107.6.88"), 123);
// Fixed NTP request message.
inline static std::array<uint8_t, 48> ntp_req_message{
0x1B, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
struct async_ntp_query_impl {
udp::socket& socket_;
struct State {
asio::coroutine coro_;
udp::endpoint local_ep_;
std::array<uint8_t, 1024> buffer_;
};
std::unique_ptr<State> state_ = std::make_unique<State>();
template <typename Self> void operator()(Self& self, sys::error_code err = {}, size_t n = 0) {
auto& [coro, local_ep, buffer] = *state_;
BOOST_ASIO_CORO_REENTER(coro) {
BOOST_ASIO_CORO_YIELD socket_.async_send_to(asio::buffer(ntp_req_message), ntp_server_ep,
std::move(self));
if (err) {
trace("async_send_to() error: ", err.message());
goto completion;
}
BOOST_ASIO_CORO_YIELD socket_.async_receive_from(asio::buffer(buffer), local_ep,
std::move(self));
if (err) {
trace("async_receive_from() error: ", err.message());
} else {
trace("async_receive_from() ok: n=", n);
hexdump(buffer, n);
}
completion:
self.complete(err);
}
}
};
template<typename CompletionToken>
auto async_ntp_query(udp::socket& socket, CompletionToken&& token)
{
auto buffer = std::make_unique<uint8_t[]>(1024);
return asio::async_compose<CompletionToken, void(sys::error_code)>(
async_ntp_query_impl{socket}, token, socket);
}
int main() {
asio::io_context ioctx;
udp::socket s{ioctx, udp::endpoint(udp::v4(), 0)};
async_ntp_query(s, [](sys::error_code err) {
if (err) {
trace("async_ntp_query() completion error: ", err.message());
} else {
trace("async_ntp_query() completion ok");
}
});
ioctx.run();
}
完成后,打印例如
at 232ms, tid:1 async_receive_from() ok: n=48
at 232ms, tid:1 1c 02 00 e7 00 00 02 f4 00 00 00 44 64 6b 19 72 e8 97 09 e3 b3 c5 a4 a9 00 00 00 00 00 00 00 00 e8 97 09 fe 5e ff fc 8c e8 97 09 fe 5f 00 de 1c
at 232ms, tid:1 async_ntp_query() completion ok
请注意,您没有超时逻辑,并且 UDP 没有保证传送。因此,程序可能会无限期地等待。