使用 boost asio 并发通道的生产者消费者

问题描述 投票:0回答:0

我的程序在尝试使用 boost asio 通道将数据从生产者发送到消费者时阻塞。

async_send 方法不是异步的。文档说:此方法异步发送数据。

我尝试使用 boost asio 通道在线程之间发送数据来实现观察者设计模式。

但是我有点惊讶 async_send 的行为。


struct Subject

{

    using Channel = asio::experimental::concurrent_channel<void(std::error_code, std::shared_ptr<Response>)>;

    std::list<Channel> channels;

};

asio::awaitable<void> for_each_channel(Subject& subject, auto action)

{

    for (auto it = subject.channels.begin(); it != subject.channels.end();)

    {

        if (it->is_open())

        {

            co_await action(*it);

            ++it;

        }

        else

        {

            it = subject.channels.erase(it);

        }

    }

}

asio::awaitable<void> notify_all(Subject& subject, std::shared_ptr<Response> response)

{

    co_await for_each_channel(subject,

                              [&](Subject::Channel& channel)

                              {

                                  return channel.async_send(std::error_code{}, response, asio::use_awaitable); // blocks here

                              });

}

asio::awaitable<void> close(Subject& subject)

{

    co_await for_each_channel(subject,

                              [&](Subject::Channel& channel)

                              {

                                  return channel.async_send(std::error_code{asio::error::operation_aborted}, nullptr,

                                                            asio::use_awaitable);

                              });

}

auto& add_observer(Subject& subject, auto executor) { return subject.channels.emplace_back(executor); }

void remove_observer(Subject::Channel& observer) { observer.close(); }

asio::awaitable<void> producer(Subject& subject)

{

    for (;;)

    {

        auto data = std::make_shared<Response>();

        co_await notify_all(subject, std::move(data));

    }

    co_await close(subject);

}

asio::awaitable<void> consumer(Subject& subject)

{

    bool ok{true};

    auto& observer = add_observer(subject, co_await asio::this_coro::executor);

    while (ok)

    {

        const auto [ec, response] = co_await observer.async_receive(asio::as_tuple(asio::use_awaitable));

        if (ec)

        {

            break;

        }

        co_await treatment(); // treat the response

}

我的问题是为什么 async_send 不是异步的。

如何避免阻塞生产者线程?

是否有比 boost 文档更有用/有用的 boost asio 频道文档。

c++ boost boost-asio coroutine
© www.soinside.com 2019 - 2024. All rights reserved.