C++20 协程读/写 websocket

问题描述 投票:0回答:1

我想使用协程和

boost::asio
使Websocket在单线程上运行。一个协程负责写入 (
async_write
),另一个协程负责读取 (
async_read
)。

如果任一协程出现异常(目前我假设每个异常都意味着连接中断),我将尝试重新连接。

为了写作,我想要一个

writeBuffer
作为消息队列。 Websocket 的客户端将调用
ws.Send(data)
,而不是立即发送,而是保留在缓冲区中,直到下一次
ws.Run()
调用。

为了使其按照描述的方式工作,我需要一种方法来在

write
为空时暂停
writeBuffer
协程。如果我不这样做,它会永远旋转,等待缓冲区填充。但是当我尝试使用
std::suspend_always{}
:

暂停它时,会出现错误
error C2665: 'boost::asio::detail::awaitable_frame_base<Executor>::await_transform': no overloaded function could convert all the argument types

所以我想这不是我用

asio::awaitable
暂停协程的方式。 我确实需要这个代理缓冲区作为我的消息的队列。我可能可以使用 boost 中的其他东西 - 也许
signals
也提供了一种
co_await
的方法,但我担心我必须再问 3 个问题才能理解这些。

这是我的代码精简到最小:

#include <iostream>
#include <coroutine>
#include <optional>

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;

struct CoroWebsocket {

    CoroWebsocket(std::string host, std::string port)
    : _host(std::move(host))
    , _port(std::move(port)) {
        asio::co_spawn(_ws.get_executor(), do_run(), asio::detached);
    }

    void Run() {
        _ioc.run_for(50ms);
    }

    void Write(std::string data) {
        // TODO: mutex
        _writeBuffer.push_back(std::move(data));
    }

    std::optional<std::string> Read(){ 
        // TODO: mutex
        if (_readBuffer.empty())
            return {};
        const auto message = _readBuffer.back();
        _readBuffer.pop_back();
        return message;
    }

private:
    const std::string _host, _port;
    using tcp = asio::ip::tcp;
    std::vector<std::string>       _writeBuffer; // Will be filled externally.
    std::vector<std::string>       _readBuffer;
    boost::asio::io_context        _ioc;
    websocket::stream<tcp::socket> _ws{_ioc};

    asio::awaitable<void> do_run() {
        while(true) {
            try {
                co_await do_connect();
                co_await asio::co_spawn(_ws.get_executor(), do_write() || do_read(), asio::use_awaitable); // If either ends, it must've been an exception. Reconnect.
            } catch (const boost::system::system_error& se) {
                std::cerr << "Error: " << se.code().message() << std::endl;
            }
        }
    }

    asio::awaitable<void> do_connect() {
        try {
            while(true) {
                co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), asio::use_awaitable);
                _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                    req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
                }));

                co_await _ws.async_handshake(_host + ':' + _port, "/", asio::use_awaitable);
                _readBuffer.clear();
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }

    asio::awaitable<void> do_write() {
        try {
            while(true) {
                while (_writeBuffer.empty()) {
                    co_await std::suspend_always{}; // I want to switch context but ERROR
                }

                for (const auto& message : _writeBuffer) {
                    co_await _ws.async_write(boost::asio::buffer(message), asio::use_awaitable);
                }
                _writeBuffer.clear();
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }

    asio::awaitable<void> do_read() {
        try {
            while(true) {
                if (0 != co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable)) {
                    while (!_ws.is_message_done()) {
                        co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable);
                    }
                    // Signal new message.
                }
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    
    }
};

理想情况下,我还想用 1 个线程的

io_context
替换
thread_pool
,这样我就可以真正分离地运行这些协程。然而,如果我只是让它们在一个单独的线程上旋转,如果没有读取或写入要做,我认为线程会在两者都挂起时在切换上下文时消耗周期。为了解决这个问题,我考虑添加一个第三个协程到混合中,如果最后一次旋转没有读取或写入,它会用基本的
this_thread::sleep(50ms)
来停止线程。

c++ boost boost-asio
1个回答
0
投票

我想要一个在单线程上运行的 Websocket,以便在读取和写入时提供服务。如果无事可做 - 检查工作而不燃烧周期

你很幸运,这正是异步完成的意义所在。

如果无事可做,则什么也不会发生,池中的线程将有效地休眠,这意味着系统中的其他进程可以轮流使用。不同之处在于,它不是“裸睡眠”,而是“智能睡眠”:一旦出现任何相关 IO 事件,睡眠就会被唤醒。这可以是 Asio 服务支持的任何事件,例如文件、管道、UNIX 或互联网套接字、串行端口、异步进程信号。

关于代码,首先让我指出生成时的等待:

    co_await asio::co_spawn(
        _ws.get_executor(), do_write() || do_read(),
        asio::use_awaitable);

是一种可能成本更高的直接写入方式

    co_await (do_write() || do_read()); // idiomatic full-duplex!

还有

    co_await std::suspend_always{}; // I want to switch context but ERROR

也总是次优,但如果你坚持的话可以

    co_await post(asio::deferred);

如何接近

我会这样做,以便仅当有东西排队时才启动写循环。这立即使得使用链同步对队列的访问变得微不足道。

或者将

channel
视为队列的替代品。写入线程可以从通道异步接收。它还可以让您控制队列容量:https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/overview/channels.html

通道在这里似乎是更好的主意,因为它还允许您自然且最佳地编写

Read
界面。

演示

使用渠道似乎是一个不错的搭配。这一切都归结为这个本质:

const std::string _host, _port;
Stream            _ws;
Channel           _in{_ws.get_executor()}, _out{_ws.get_executor()};

Task do_run() {
    while (true) {
        try {
            co_await do_connect();
            co_await (do_write_loop() || do_read_loop());
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }
}

Task do_write_loop() {
    for (;;)
        co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
}

Task do_read_loop() {
    for (Message msg;; msg.clear()) {
        auto buf = asio::dynamic_buffer(msg);

        auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
        co_await _in.async_send(ec, std::move(msg));
    }
}

添加正常关闭标志和连接失败时的后退延迟:

Task do_run() {
    for (; !_close_requested; co_await delay(200ms)) {
        try {
            co_await do_connect();
            co_await (do_write_loop() || do_read_loop());
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }
}

完整列表

住在Coliru

#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/beast.hpp>

namespace asio      = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
using boost::system::error_code;
using tcp = asio::ip::tcp;

struct Server {
    using Message  = std::string;
    using Task     = asio::awaitable<void>;
    using Channel  = asio::deferred_t::as_default_on_t< //
        asio::experimental::concurrent_channel<void(error_code, Message)>>;
    using Stream   = asio::deferred_t::as_default_on_t<websocket::stream<tcp::socket>>;
    using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;
    using Opts     = websocket::stream_base;

    Server(asio::any_io_executor ex, std::string host, std::string port)
        : _host(std::move(host))
        , _port(std::move(port))
        , _ws(ex) {
        co_spawn(ex, do_run(), asio::detached);
    }

    bool Write(std::string data) { return _out.try_send(error_code{}, std::move(data)); }

    std::optional<std::string> Read() {
        Message ret;
        if (_in.try_receive([&](error_code ec, Message msg_) {
                if (ec)
                    throw boost::system::system_error(ec);
                ret = std::move(msg_);
            }))
            return ret;
        return std::nullopt;
    }

    void close() {
        _close_requested = true;
        post(_ws.get_executor(), [this] {
            if (error_code ignore; _ws.is_open())
                _ws.close({}, ignore);
        });
    }

  private:
    const std::string _host, _port;
    Stream            _ws;
    Channel           _in{_ws.get_executor(), 10}, _out{_ws.get_executor(), 10};
    std::atomic_bool  _close_requested{false};

    Task delay(auto duration_or_timepoint) {
        auto ex = co_await asio::this_coro::executor;
        co_await asio::steady_timer(ex, duration_or_timepoint).async_wait(asio::deferred);
    }

    Task do_run() {
        for (; !_close_requested; co_await delay(200ms)) {
            try {
                co_await do_connect();
                co_await (do_write_loop() || do_read_loop());
            } catch (boost::system::system_error const& se) {
                std::cerr << "Error: " << se.code().message() << std::endl;
            }
        }
    }

    Task do_write_loop() {
        for (;;)
            co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
    }

    Task do_read_loop() {
        for (Message msg;; msg.clear()) {
            auto buf = asio::dynamic_buffer(msg);

            auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
            co_await _in.async_send(ec, std::move(msg));

            if (ec)
                break;
        }
    }

    asio::awaitable<void> do_connect() {
        auto ex = co_await asio::this_coro::executor;
        if (error_code ignore; _ws.is_open()) {
            _ws.close({}, ignore);
            _in.reset();
            _out.reset();
        }

        auto eps = co_await Resolver(ex).async_resolve(_host, _port);
        co_await async_connect(beast::get_lowest_layer(_ws), eps);

        _ws.set_option(Opts::decorator([](websocket::request_type& req) {
            req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
        }));

        co_await _ws.async_handshake(_host + ':' + _port, "/");
    }
};

int main() {
    asio::thread_pool ioc;
    Server s(make_strand(ioc), "localhost", "8989");

    using std::this_thread::sleep_for;

    for (auto msg : {"foo", "bar", "qux"}) {
        s.Write(msg);
        sleep_for(1s);

        while (auto response = s.Read())
            std::cout << "Received response " << quoted(*response) << std::endl;
    }

    s.close();
    ioc.join();
}

现场演示反对:

 websocketd --port 8989 -- \
      bash -c 'tee log | while read line; do echo "Responding to ($line)"; done'

¹ 以及一些特定于平台的东西,例如 Windows 上的完成端口

© www.soinside.com 2019 - 2024. All rights reserved.