我想使用协程和
boost::asio
使Websocket在单线程上运行。一个协程负责写入 (async_write
),另一个协程负责读取 (async_read
)。
如果任一协程出现异常(目前我假设每个异常都意味着连接中断),我将尝试重新连接。
为了写作,我想要一个
writeBuffer
作为消息队列。 Websocket 的客户端将调用 ws.Send(data)
,而不是立即发送,而是保留在缓冲区中,直到下一次 ws.Run()
调用。
为了使其按照描述的方式工作,我需要一种方法来在
write
为空时暂停 writeBuffer
协程。如果我不这样做,它会永远旋转,等待缓冲区填充。但是当我尝试使用 std::suspend_always{}
: 暂停它时,会出现错误
error C2665: 'boost::asio::detail::awaitable_frame_base<Executor>::await_transform': no overloaded function could convert all the argument types
所以我想这不是我用
asio::awaitable
暂停协程的方式。
我确实需要这个代理缓冲区作为我的消息的队列。我可能可以使用 boost 中的其他东西 - 也许 signals
也提供了一种 co_await
的方法,但我担心我必须再问 3 个问题才能理解这些。
这是我的代码精简到最小:
#include <iostream>
#include <coroutine>
#include <optional>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
struct CoroWebsocket {
CoroWebsocket(std::string host, std::string port)
: _host(std::move(host))
, _port(std::move(port)) {
asio::co_spawn(_ws.get_executor(), do_run(), asio::detached);
}
void Run() {
_ioc.run_for(50ms);
}
void Write(std::string data) {
// TODO: mutex
_writeBuffer.push_back(std::move(data));
}
std::optional<std::string> Read(){
// TODO: mutex
if (_readBuffer.empty())
return {};
const auto message = _readBuffer.back();
_readBuffer.pop_back();
return message;
}
private:
const std::string _host, _port;
using tcp = asio::ip::tcp;
std::vector<std::string> _writeBuffer; // Will be filled externally.
std::vector<std::string> _readBuffer;
boost::asio::io_context _ioc;
websocket::stream<tcp::socket> _ws{_ioc};
asio::awaitable<void> do_run() {
while(true) {
try {
co_await do_connect();
co_await asio::co_spawn(_ws.get_executor(), do_write() || do_read(), asio::use_awaitable); // If either ends, it must've been an exception. Reconnect.
} catch (const boost::system::system_error& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
asio::awaitable<void> do_connect() {
try {
while(true) {
co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), asio::use_awaitable);
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
co_await _ws.async_handshake(_host + ':' + _port, "/", asio::use_awaitable);
_readBuffer.clear();
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
asio::awaitable<void> do_write() {
try {
while(true) {
while (_writeBuffer.empty()) {
co_await std::suspend_always{}; // I want to switch context but ERROR
}
for (const auto& message : _writeBuffer) {
co_await _ws.async_write(boost::asio::buffer(message), asio::use_awaitable);
}
_writeBuffer.clear();
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
asio::awaitable<void> do_read() {
try {
while(true) {
if (0 != co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable)) {
while (!_ws.is_message_done()) {
co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable);
}
// Signal new message.
}
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
};
理想情况下,我还想用 1 个线程的
io_context
替换 thread_pool
,这样我就可以真正分离地运行这些协程。然而,如果我只是让它们在一个单独的线程上旋转,如果没有读取或写入要做,我认为线程会在两者都挂起时在切换上下文时消耗周期。为了解决这个问题,我考虑添加一个第三个协程到混合中,如果最后一次旋转没有读取或写入,它会用基本的 this_thread::sleep(50ms)
来停止线程。
我想要一个在单线程上运行的 Websocket,以便在读取和写入时提供服务。如果无事可做 - 检查工作而不燃烧周期
你很幸运,这正是异步完成的意义所在。
如果无事可做,则什么也不会发生,池中的线程将有效地休眠,这意味着系统中的其他进程可以轮流使用。不同之处在于,它不是“裸睡眠”,而是“智能睡眠”:一旦出现任何相关 IO 事件,睡眠就会被唤醒。这可以是 Asio 服务支持的任何事件,例如文件、管道、UNIX 或互联网套接字、串行端口、异步进程信号。
关于代码,首先让我指出生成时的等待:
co_await asio::co_spawn(
_ws.get_executor(), do_write() || do_read(),
asio::use_awaitable);
是一种可能成本更高的直接写入方式
co_await (do_write() || do_read()); // idiomatic full-duplex!
还有
co_await std::suspend_always{}; // I want to switch context but ERROR
也总是次优,但如果你坚持的话可以
co_await post(asio::deferred);
我会这样做,以便仅当有东西排队时才启动写循环。这立即使得使用链同步对队列的访问变得微不足道。
或者将
channel
视为队列的替代品。写入线程可以从通道异步接收。它还可以让您控制队列容量:https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/overview/channels.html
通道在这里似乎是更好的主意,因为它还允许您自然且最佳地编写
Read
界面。
使用渠道似乎是一个不错的搭配。这一切都归结为这个本质:
const std::string _host, _port;
Stream _ws;
Channel _in{_ws.get_executor()}, _out{_ws.get_executor()};
Task do_run() {
while (true) {
try {
co_await do_connect();
co_await (do_write_loop() || do_read_loop());
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
Task do_write_loop() {
for (;;)
co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
}
Task do_read_loop() {
for (Message msg;; msg.clear()) {
auto buf = asio::dynamic_buffer(msg);
auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
co_await _in.async_send(ec, std::move(msg));
}
}
添加正常关闭标志和连接失败时的后退延迟:
Task do_run() {
for (; !_close_requested; co_await delay(200ms)) {
try {
co_await do_connect();
co_await (do_write_loop() || do_read_loop());
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/beast.hpp>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
using boost::system::error_code;
using tcp = asio::ip::tcp;
struct Server {
using Message = std::string;
using Task = asio::awaitable<void>;
using Channel = asio::deferred_t::as_default_on_t< //
asio::experimental::concurrent_channel<void(error_code, Message)>>;
using Stream = asio::deferred_t::as_default_on_t<websocket::stream<tcp::socket>>;
using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;
using Opts = websocket::stream_base;
Server(asio::any_io_executor ex, std::string host, std::string port)
: _host(std::move(host))
, _port(std::move(port))
, _ws(ex) {
co_spawn(ex, do_run(), asio::detached);
}
bool Write(std::string data) { return _out.try_send(error_code{}, std::move(data)); }
std::optional<std::string> Read() {
Message ret;
if (_in.try_receive([&](error_code ec, Message msg_) {
if (ec)
throw boost::system::system_error(ec);
ret = std::move(msg_);
}))
return ret;
return std::nullopt;
}
void close() {
_close_requested = true;
post(_ws.get_executor(), [this] {
if (error_code ignore; _ws.is_open())
_ws.close({}, ignore);
});
}
private:
const std::string _host, _port;
Stream _ws;
Channel _in{_ws.get_executor(), 10}, _out{_ws.get_executor(), 10};
std::atomic_bool _close_requested{false};
Task delay(auto duration_or_timepoint) {
auto ex = co_await asio::this_coro::executor;
co_await asio::steady_timer(ex, duration_or_timepoint).async_wait(asio::deferred);
}
Task do_run() {
for (; !_close_requested; co_await delay(200ms)) {
try {
co_await do_connect();
co_await (do_write_loop() || do_read_loop());
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
Task do_write_loop() {
for (;;)
co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
}
Task do_read_loop() {
for (Message msg;; msg.clear()) {
auto buf = asio::dynamic_buffer(msg);
auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
co_await _in.async_send(ec, std::move(msg));
if (ec)
break;
}
}
asio::awaitable<void> do_connect() {
auto ex = co_await asio::this_coro::executor;
if (error_code ignore; _ws.is_open()) {
_ws.close({}, ignore);
_in.reset();
_out.reset();
}
auto eps = co_await Resolver(ex).async_resolve(_host, _port);
co_await async_connect(beast::get_lowest_layer(_ws), eps);
_ws.set_option(Opts::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
co_await _ws.async_handshake(_host + ':' + _port, "/");
}
};
int main() {
asio::thread_pool ioc;
Server s(make_strand(ioc), "localhost", "8989");
using std::this_thread::sleep_for;
for (auto msg : {"foo", "bar", "qux"}) {
s.Write(msg);
sleep_for(1s);
while (auto response = s.Read())
std::cout << "Received response " << quoted(*response) << std::endl;
}
s.close();
ioc.join();
}
现场演示反对:
websocketd --port 8989 -- \
bash -c 'tee log | while read line; do echo "Responding to ($line)"; done'
¹ 以及一些特定于平台的东西,例如 Windows 上的完成端口