ASIO:co_await 可调用在链上运行

问题描述 投票:0回答:1

问题

我们有一些共享资源:内存池、线程不安全的 API,任您选择。我们希望通过 ASIO 链控制对所述资源的访问。所有访问资源的例程都应该在该链上运行。

我们还使用 C++20 协程并享受它们提供的顺序执行的错觉。

当访问共享资源时,我们希望使用

co_await
暂停协程,切换到受祝福的链,对资源执行任何操作,然后返回到其本机执行器上的协程。

注意事项:

  • 我们不想使用“
    dispatch
    技巧”,因为人体工程学很糟糕,而且这是一个等待发生的竞争条件。
auto s1 = bind_executor(strand, asio::deferred);
co_await asio::dispatch(s1);
// Access shared resource
co_await asio::dispatch(asio::deferred);
  • 我们不想将上述技巧包装在另一个
    asio::awaitable
    中,并可能为不需要的操作分配一个协程框架(通过
    use_awaitable

当前的解决方案

这是我今天早上破解的,显然它不是很好(不使用概念,不转发参数,不允许返回值等),但它说明了我想要的:

神箭

static std::atomic_int tid_gen;
thread_local int const tid = tid_gen++;

inline void out(auto const& msg) { std::print("T{:x} {}\n", tid, msg); }

template <typename F, typename Ex, typename CT>
auto async_run_on(F f, Ex ex, CT&& token) {
  return asio::async_initiate<CT, void()>(
      [](auto handler, F f, Ex ex) {
        ex.dispatch(
            [=, handler = std::move(handler)]() mutable {
              std::invoke(f);
              handler.get_executor().execute(std::move(handler));
            },
            asio::get_associated_allocator(ex));
      },
      token, std::move(f), ex);
}

asio::awaitable<void> f(auto strand) {
    out("Main");

    co_await async_run_on([](){ out("Strand"); }, strand, asio::deferred);
    
    out("Main again");
}

int main() {
    asio::io_context io;
    asio::thread_pool tp(1);

    co_spawn(io, f(make_strand(tp)), asio::detached);
    io.run();
}

希望不言而喻的是,我们可以扩展它来构建始终在给定执行器上运行并返回到任何地方的可调用等待对象。

问题

这个用例应该如何工作?

我的ASIO很差,最差的。我是第一个遇到这个问题的可能性为零。这让我对我提出的任何解决方案都非常怀疑。协程和链是 ASIO 构建块,我不是 sehe 或 ChrisK,我不是弄清楚这些拼图如何组合在一起的人。

但是 Google、SO 和 ASIO 文档对此都保持奇怪的沉默。对于链给出的示例和对于协程给出的示例之间几乎没有交叉授粉。

我们是否应该每次都进行

co_await

'ing 链绑定延迟操作,或者是否有一个我完全错过的自由函数?

c++ boost boost-asio c++-coroutine
1个回答
0
投票
我真的很欣赏你前两段中清晰的介绍。他们专注于功能性目标。

然后,突然出现了一个明显的转变:

当访问共享资源时,我们希望使用 co_await 挂起协程,切换到受祝福的链,对资源执行任何操作,然后返回到其本机执行器上的协程。

突然我们把功能需求和界面设计的领域留给了实现机制。幸运的是,您的优秀示例通过实际提供抽象来恢复平衡

async_run_on

,这与我要建议的内容
接近

回到功能需求:

“内存池、线程不安全的 API,任君选择[...]所有访问资源的例程都应该在该链上运行。”

如果我的任务是设计该资源的 Asio 接口,我会将其建模为 IO 对象。 IO 对象应该绑定到其指定的执行器(在这种情况下通常是一个链),并且成员启动函数应该分派给它(就像

async_run_on

,除了执行器不再是参数而是成员)。

所以,也许类似于你的代码,我会做类似的事情:

template <typename Executor> struct base_resource { using executor_type = Executor; executor_type get_executor() const { return ex_; } base_resource(Executor ex) : ex_(std::move(ex)) {} template <typename CT> auto async_foo(CT&& token) { return asio::async_initiate<CT, void()>( [this](auto handler) { trace("initiating"); asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); }); }, token); } private: void do_foo(auto handler) { trace("doing foo"); // complete with handler; dispatch to bound executor asio::dispatch(std::move(handler)); } Executor ex_; };
现在您可以使用它而不会泄露实现细节:

using Resource = base_resource<asio::any_io_executor>; asio::awaitable<void> f(Resource& res) { trace("f invoke resource"); co_await res.async_foo(asio::deferred); trace("f back"); }
如果您愿意,您仍然可以控制延续应保留在资源的执行器上:

trace("f force stay on resource strand: "); co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred)); trace("f (strand should have stuck)"); co_await post(asio::deferred); // relinquish strand trace("f back");
完整演示

住在Coliru

#include <boost/asio.hpp> #include <fmt/core.h> namespace asio = boost::asio; using namespace std::chrono_literals; static std::atomic_int tid_gen; thread_local int const tid = tid_gen++; std::optional<asio::any_io_executor> g_strand; static inline void trace(auto const& msg) { std::string_view tag = "other"; if (auto* s = g_strand->target<asio::strand<asio::io_context::executor_type>>(); s && s->running_in_this_thread()) tag = "on io_context strand"; if (auto* s = g_strand->target<asio::strand<asio::thread_pool::executor_type>>(); s && s->running_in_this_thread()) tag = "on thread_pool strand"; fmt::print("T{:x} {} [{}]\n", tid, tag, msg); } template <typename Executor> struct base_resource { using executor_type = Executor; executor_type get_executor() const { return ex_; } base_resource(Executor ex) : ex_(std::move(ex)) {} template <typename CT> auto async_foo(CT&& token) { return asio::async_initiate<CT, void()>( [this](auto handler) { trace("initiating"); asio::dispatch(ex_, [this, h = std::move(handler)]() mutable { do_foo(std::move(h)); }); }, token); } private: void do_foo(auto handler) { trace("doing foo"); // complete with handler; dispatch to bound executor asio::dispatch(std::move(handler)); } Executor ex_; }; using Resource = base_resource<asio::any_io_executor>; asio::awaitable<void> f(Resource& res) { trace("f invoke resource"); co_await res.async_foo(asio::deferred); trace("f back"); trace("f force stay on resource strand: "); co_await res.async_foo(bind_executor(res.get_executor(), asio::deferred)); trace("f (strand should have stuck)"); co_await post(asio::deferred); // relinquish strand trace("f back"); } int main() { trace("Main"); asio::io_context io; asio::thread_pool tp(1); g_strand = asio::make_strand(tp); Resource shared_res{*g_strand}; co_spawn(io, f(shared_res), asio::detached); io.run(); tp.join(); g_strand.reset(); trace("Done"); }
打印

T0 other [Main] T0 other [f invoke resource] T0 other [initiating] T1 on thread_pool strand [doing foo] T0 other [f back] T0 other [f force stay on resource strand: ] T0 other [initiating] T1 on thread_pool strand [doing foo] T1 on thread_pool strand [f (strand should have stuck)] T0 other [f back] T0 other [Done]


相关问题链接:

其中一些将拥有丰富的视角和细节,可以影响您的设计决策:

  • Boost ASIO:什么执行器与默认完成令牌关联?他们应该在本地进行表演吗?
  • 如何在提供阻塞和异步功能的库中使用 asio::strand
  • 如何让 asio eventloop 从另一个线程调用 lambda?
  • 一个协程函数在多个链/线程上工作是否安全?它链接回使用 boost asio 链作为“互斥体”不适用于协程
© www.soinside.com 2019 - 2024. All rights reserved.