我们有一些共享资源:内存池、线程不安全的 API,任您选择。我们希望通过 ASIO 链控制对所述资源的访问。所有访问资源的例程都应该在该链上运行。
我们还使用 C++20 协程并享受它们提供的顺序执行的错觉。
当访问共享资源时,我们希望使用
co_await
暂停协程,切换到受祝福的链,对资源执行任何操作,然后返回到其本机执行器上的协程。
注意事项:
dispatch
技巧”,因为人体工程学很糟糕,而且这是一个等待发生的竞争条件。auto s1 = bind_executor(strand, asio::deferred);
co_await asio::dispatch(s1);
// Access shared resource
co_await asio::dispatch(asio::deferred);
asio::awaitable
中,并可能为不需要的操作分配一个协程框架(通过use_awaitable
)这是我今天早上破解的,显然它不是很好(不使用概念,不转发参数,不允许返回值等),但它说明了我想要的:
(神箭)
static std::atomic_int tid_gen;
thread_local int const tid = tid_gen++;
inline void out(auto const& msg) { std::print("T{:x} {}\n", tid, msg); }
template <typename F, typename Ex, typename CT>
auto async_run_on(F f, Ex ex, CT&& token) {
return asio::async_initiate<CT, void()>(
[](auto handler, F f, Ex ex) {
ex.dispatch(
[=, handler = std::move(handler)]() mutable {
std::invoke(f);
handler.get_executor().execute(std::move(handler));
},
asio::get_associated_allocator(ex));
},
token, std::move(f), ex);
}
asio::awaitable<void> f(auto strand) {
out("Main");
co_await async_run_on([](){ out("Strand"); }, strand, asio::deferred);
out("Main again");
}
int main() {
asio::io_context io;
asio::thread_pool tp(1);
co_spawn(io, f(make_strand(tp)), asio::detached);
io.run();
}
希望不言而喻的是,我们可以扩展它来构建始终在给定执行器上运行并返回到任何地方的可调用等待对象。
这个用例应该如何工作?
我的ASIO很差,最差的。我是第一个遇到这个问题的可能性为零。这让我对我提出的任何解决方案都非常怀疑。协程和链是 ASIO 构建块,我不是 sehe 或 ChrisK,我不是弄清楚这些拼图如何组合在一起的人。但是 Google、SO 和 ASIO 文档对此都保持奇怪的沉默。对于链给出的示例和对于协程给出的示例之间几乎没有交叉授粉。
我们是否应该每次都进行
co_await
'ing 链绑定延迟操作,或者是否有一个我完全错过的自由函数?
然后,突然出现了一个明显的转变:
突然我们把功能需求和界面设计的领域留给了实现机制。幸运的是,您的优秀示例通过实际提供抽象来恢复平衡当访问共享资源时,我们希望使用 co_await 挂起协程,切换到受祝福的链,对资源执行任何操作,然后返回到其本机执行器上的协程。
async_run_on
,这与我要建议的内容接近。 回到功能需求:
“内存池、线程不安全的 API,任君选择[...]所有访问资源的例程都应该在该链上运行。”
如果我的任务是设计该资源的 Asio 接口,我会将其建模为 IO 对象。 IO 对象应该绑定到其指定的执行器(在这种情况下通常是一个链),并且成员启动函数应该分派给它(就像async_run_on
,除了执行器不再是参数而是成员)。所以,也许类似于你的代码,我会做类似的事情:
template <typename Executor> struct base_resource {
using executor_type = Executor;
executor_type get_executor() const { return ex_; }
base_resource(Executor ex) : ex_(std::move(ex)) {}
template <typename CT> auto async_foo(CT&& token) {
return asio::async_initiate<CT, void()>(
[this](auto handler) {
trace("initiating");
asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); });
},
token);
}
private:
void do_foo(auto handler) {
trace("doing foo");
// complete with handler; dispatch to bound executor
asio::dispatch(std::move(handler));
}
Executor ex_;
};
现在您可以使用它而不会泄露实现细节:
using Resource = base_resource<asio::any_io_executor>;
asio::awaitable<void> f(Resource& res) {
trace("f invoke resource");
co_await res.async_foo(asio::deferred);
trace("f back");
}
如果您愿意,您仍然可以控制延续应保留在资源的执行器上:
trace("f force stay on resource strand: ");
co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred));
trace("f (strand should have stuck)");
co_await post(asio::deferred); // relinquish strand
trace("f back");
完整演示#include <boost/asio.hpp>
#include <fmt/core.h>
namespace asio = boost::asio;
using namespace std::chrono_literals;
static std::atomic_int tid_gen;
thread_local int const tid = tid_gen++;
std::optional<asio::any_io_executor> g_strand;
static inline void trace(auto const& msg) {
std::string_view tag = "other";
if (auto* s = g_strand->target<asio::strand<asio::io_context::executor_type>>();
s && s->running_in_this_thread())
tag = "on io_context strand";
if (auto* s = g_strand->target<asio::strand<asio::thread_pool::executor_type>>();
s && s->running_in_this_thread())
tag = "on thread_pool strand";
fmt::print("T{:x} {} [{}]\n", tid, tag, msg);
}
template <typename Executor> struct base_resource {
using executor_type = Executor;
executor_type get_executor() const { return ex_; }
base_resource(Executor ex) : ex_(std::move(ex)) {}
template <typename CT> auto async_foo(CT&& token) {
return asio::async_initiate<CT, void()>(
[this](auto handler) {
trace("initiating");
asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); });
},
token);
}
private:
void do_foo(auto handler) {
trace("doing foo");
// complete with handler; dispatch to bound executor
asio::dispatch(std::move(handler));
}
Executor ex_;
};
using Resource = base_resource<asio::any_io_executor>;
asio::awaitable<void> f(Resource& res) {
trace("f invoke resource");
co_await res.async_foo(asio::deferred);
trace("f back");
trace("f force stay on resource strand: ");
co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred));
trace("f (strand should have stuck)");
co_await post(asio::deferred); // relinquish strand
trace("f back");
}
int main() {
trace("Main");
asio::io_context io;
asio::thread_pool tp(1);
g_strand = asio::make_strand(tp);
Resource shared_res{*g_strand};
co_spawn(io, f(shared_res), asio::detached);
io.run();
tp.join();
g_strand.reset();
trace("Done");
}
打印
T0 other [Main]
T0 other [f invoke resource]
T0 other [initiating]
T1 on thread_pool strand [doing foo]
T0 other [f back]
T0 other [f force stay on resource strand: ]
T0 other [initiating]
T1 on thread_pool strand [doing foo]
T1 on thread_pool strand [f (strand should have stuck)]
T0 other [f back]
T0 other [Done]
其中一些将拥有丰富的视角和细节,可以影响您的设计决策: