我编写了以下代码来分析单线程应用程序中 Asio 中实验通道的性能。在 [email protected] 上,大约需要 1 秒才能完成,表现出每秒大约 3M 个项目的吞吐量。
该问题可能是由于 asio 处于单线程模式,导致生产者向消费者部分发出信号,并导致消费者协程在每次调用 async_send() 时立即恢复。但是,我不确定如何测试以确认是否是这种情况以及如何在实际应用中避免这种情况。减少通道缓冲区大小,甚至减少到 0,对吞吐量没有影响,这可能是出于同样的原因。
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using channel_t = asio::experimental::channel< void(boost::system::error_code, uint64_t) >;
asio::awaitable< void >
producer(channel_t &ch)
{
for (uint64_t i = 0; i < 3'000'000; i++)
co_await ch.async_send(boost::system::error_code {}, i, asio::use_awaitable);
ch.close();
}
asio::awaitable< void >
consumer(channel_t &ch)
{
for (;;)
co_await ch.async_receive(asio::use_awaitable);
}
asio::awaitable< void >
experiment()
{
channel_t ch { co_await asio::this_coro::executor, 1000 };
co_await (consumer(ch) && producer(ch));
}
int
main()
{
asio::io_context ctx {};
asio::co_spawn(ctx, experiment(), asio::detached);
ctx.run();
}
您可以通过提供有关线程的提示来节省一点:
BOOST_ASIO_CONCURRENCY_HINT_UNSAFE
)any_io_executor
替换为您使用的具体执行器类型我编写了一个减少消息数量 (30k) 的并行基准测试,以便 Nonius 可以对 100 次运行进行采样并对结果进行统计分析:
//#define TWEAKS
#ifdef TWEAKS
#define BOOST_ASIO_DISABLE_THREADS 1
#endif
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using boost::system::error_code;
using context = asio::io_context;
#ifdef TWEAKS
using executor_t = context::executor_type;
using channel_t = asio::experimental::channel<executor_t, void(error_code, uint64_t)>;
#else
using executor_t = asio::any_io_executor;
using channel_t = asio::experimental::channel<void(error_code, uint64_t)>;
#endif
asio::awaitable<void> producer(channel_t& ch) {
for (uint64_t i = 0; i < 30'000; i++)
co_await ch.async_send(error_code {}, i, asio::use_awaitable);
ch.close();
}
asio::awaitable<void> consumer(channel_t& ch) {
for (;;)
co_await ch.async_receive(asio::use_awaitable);
}
asio::awaitable<void> experiment() {
asio::any_io_executor ex = co_await asio::this_coro::executor;
channel_t ch { *ex.target<executor_t>(), 1000 };
co_await (consumer(ch) && producer(ch));
}
void foo() {
try {
#ifdef TWEAKS
asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE};
#else
asio::io_context ctx{1};
#endif
asio::co_spawn(ctx, experiment(), asio::detached);
ctx.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
#include <nonius/benchmark.h++>
#define NONIUS_RUNNER
#include <nonius/main.h++>
NONIUS_BENCHMARK( //
"foo", //
[](nonius::chronometer cm) { cm.measure([] { foo(); }); })
每 30k 批次的结果(包括构造和拆卸)为:
因此速度提高了约 25%,并且方差也大大减少。
将 系列合并到一张图中:
这些只是 Asio 的技术调整。我可能还缺少一些。
我怀疑您应该能够通过智能缓冲获得更好的吞吐量。我假设您出于其他原因需要 Asio 集成,因此这是正确的选择。
事实证明,消费者和生产者双方在每次发送/接收操作上都进行了重新调度;这就是通道大小对吞吐量没有影响的原因。 我对代码做了如下修改,现在每秒可以发送90M。然而,这正是我对实施的期望。
asio::awaitable< void >
producer(channel_t &ch)
{
for (uint64_t i = 0; i < 90'000'000; i++)
{
if (!ch.try_send(boost::system::error_code {}, i))
co_await ch.async_send(boost::system::error_code {}, i, asio::use_awaitable);
}
ch.close();
}
asio::awaitable< void >
consumer(channel_t &ch)
{
for (;;)
{
if (!ch.try_receive([](auto, auto) {}))
co_await ch.async_receive(asio::use_awaitable);
}
}
我相信这不是通道的默认行为的原因是Asio中的awaitables无法在await_ready()调用中返回true,因此它们总是必须挂起并启动异步操作。