一个协程函数在多个链/线程上工作安全吗?

问题描述 投票:0回答:1

动机

我正在使用 C++20 协程编写消息代理。代理有一些会话(连接)。 经纪人传递的消息如下:

sender(ss)       broker  receiver(sr)
   |               |           |
   | message       |           |
   |-------------->|           |
   |               | message   |
   |               |---------->|
   |               |           |

发送过程由发送者会话(ss)触发。在此过程中,需要访问接收者会话(ss)。 ss 和 sr 可以在不同的线程和不同的strand上工作。如何很好地切换股线

演示情况的最小代码示例

#include <boost/asio.hpp>
#include <iostream>
#include <syncstream>
#include <thread>
#include <cassert>

namespace as = boost::asio;

template <typename Executor>
struct session {
    session(Executor exe):str_{exe} {}
    void access() const {
        std::osyncstream(std::cout) << std::this_thread::get_id() << " access()" << std::endl;
        assert(str_.running_in_this_thread());
    }
    as::strand<Executor> str_;
};

int main() {
    as::io_context iocs;
    as::io_context iocr;
    auto guards = as::make_work_guard(iocs.get_executor());
    auto guardr = as::make_work_guard(iocr.get_executor());
    session ss{iocs.get_executor()};
    session sr{iocr.get_executor()};

    co_spawn(
        ss.str_,  // sender's strand
        [&] () -> as::awaitable<void> {
            std::osyncstream(std::cout) 
                << std::this_thread::get_id() 
                << " sender start deliver" 
                << std::endl;
            // need to access to receiver's resource to deliver
#if 0
            sr.access(); // assertion failed because this coroutine is
                         // working on sender's strand, not receiver's one
#endif
            // so dispatch to the receiver's strand
            co_await as::dispatch(sr.str_, as::use_awaitable);
#if 0
                         // B
            sr.access(); // assertion failed because this coroutine is
                         // working on sender's strand, not receiver's one
#endif
            // so dispatch to the receiver's strand
            co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));

            sr.access(); // working on receiver's thread. 
                         // This is expected behavior. 

            guards.reset();
            guardr.reset();
            co_return;
        },
        as::detached
    );
    std::thread ts {
        [&] {
            std::osyncstream(std::cout) << std::this_thread::get_id() << " sender thread" << std::endl;
            iocs.run();
        }
    };
    std::thread tr {
        [&] {
            std::osyncstream(std::cout) << std::this_thread::get_id() << " receiver thread" << std::endl;
            iocr.run();
        }
    };
    ts.join();
    tr.join();
}

godbolt 链接:https://godbolt.org/z/z8r6fYjj3

session
具有链
str_
access()
功能。
access()
函数期望在
str_
上调用该函数以消除资源锁定。

co_spawn
本身正在由发送者链工作。

我尝试了什么

直接调用接收者的access()

导致断言失败。这是显而易见的,因为代码在发送者的链上工作,而不是接收者的链。

使用以接收者的链作为第一个参数的调度(Executor)

仍然导致断言失败。 我不清楚。我猜想该进程已切换到接收者的线程,但 co_await 切换回发送者的线程,因为 co_spawn 的执行器是发送者的线程。它用作默认执行器。这只是我的猜测。

将dispatch与bind_executor一起使用,其第一个参数(绑定执行器)是接收者的strand

它按我的预期工作。

            // so dispatch to the receiver's strand
            co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));

            sr.access(); // working on receiver's thread. 
                         // This is expected behavior. 

即使 co_spawn 的执行者是发送者的链,co_await 之后的代码部分也在接收者的链上工作。

但是,我不确定它是否安全。我怀疑这可能是未定义行为的一种表现。

如果安全的话。我想使用这种方法。

另一种方法

我阅读了字符服务器的代码示例。

https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/example/cpp20/coroutines/chat_server.cpp 这很有趣。该会话有两个协程:reader() 和 writer()。 writer() 的 async_write() 是通过取消从未触发的计时器来触发的。这有点棘手。在使用队列完成 aync 过程之前,可以很好地避免连续的 async_write() 调用。

但是,就我而言,会话内部已经有排队机制。所以这个技巧对我来说太过分了。

总结问题

问题1

以下方法安全吗?我是不是错过了什么重要的事情?

co_spawn(
    base_strand,
    coroutine_function
);
as::awaitable<void>
coroutine_function() {
    // basically working on base_strand.
    // ...

    co_await some_async_func(..., bind_executor(strand1, as::use_awaitable));
    // this part is working on strand1
    // ...

    co_await some_async_func(..., bind_executor(strand2, as::use_awaitable));
    // this part is working on strand2
    // ...

    co_await some_async_func(..., as::use_awaitable);
    // this part is working on base_strand
    // ...
}

问题2

co_await as::dispatch(sr.str_, as::use_awaitable);
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));

为什么第一个代码无法在

sr.str_
上运行?

c++ multithreading c++20 boost-asio coroutine
1个回答
0
投票

Q1 - 是的,对于 Asio 来说是安全的。协程是隐式链,因为恢复是处理程序的线性链。您可以在任何地方运行它们,包括在线束上。

Q2 - 这是一个经常发生的混乱,例如使用 boost asio 链作为“互斥体”不适用于协程

另请参阅这些解释:

© www.soinside.com 2019 - 2024. All rights reserved.