我计划拥有 TCP 到 WebSocket 隧道。多个 TCP 连接到一个 WebSocket 连接(我知道这需要多路复用,但一次只做一件事)。这意味着有一个 TCP 套接字在本地侦听并通过 WebSocket 转发从 TCP 套接字接收到的所有内容。
最近在学习Boost Asio和Boost Beast,到目前为止写了以下代码。它还远未完成,但由于我对协程和 Boost 还很陌生,所以我有一些设计问题,
#include "server_certificate.hpp" // Got this from Boost Beast
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/json/src.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <memory>
#include <time.h>
#pragma comment(lib, "crypt32.lib")
#pragma comment(lib, "libssl.lib")
#pragma comment(lib, "libcrypto.lib")
using namespace std;
namespace json = boost::json;
namespace asio = boost::asio;
namespace this_coro = asio::this_coro;
using asio::ip::tcp;
using boost::system::error_code;
using namespace asio::experimental::awaitable_operators;
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
template <typename T> using Defer = asio::deferred_t::as_default_on_t<T>;
using Socket = Defer<tcp::socket>;
using Acceptor = Defer<tcp::acceptor>;
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
/* TCP listening locally */
asio::awaitable<void> tcp_session(Socket socket)
{
auto ep = socket.remote_endpoint();
std::cout << "New TCP session for " << ep << "\n";
asio::streambuf buf;
size_t requests_handled = 0;
for (boost::system::error_code ec; !ec;)
{
auto token = redirect_error(asio::deferred, ec);
auto n = co_await socket.async_read_some(buf.prepare(1024), token);
buf.commit(n);
auto eof = ec == asio::error::eof;
if (buf.size() > 0)
{
requests_handled += 1;
auto n = co_await async_write(socket, buf, token);
buf.consume(n);
}
}
std::cout << "End of " << ep << " TCP session" << endl;
}
asio::awaitable<void> tcp_listener(tcp::endpoint& ep)
{
auto ex = co_await this_coro::executor;
Acceptor acc(ex, ep);
for (;;)
{
co_spawn(ex, tcp_session(co_await acc.async_accept()), asio::detached);
}
}
/* HTTPS WebSocket listening on Internet */
// Echoes back all received WebSocket messages
void do_session(
websocket::stream<beast::ssl_stream<beast::tcp_stream>>&ws,
net::yield_context yield)
{
beast::error_code ec;
// Set the timeout.
beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
ws.next_layer().async_handshake(ssl::stream_base::server, yield[ec]);
if (ec)
{
return fail(ec, "handshake");
}
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws).expires_never();
// Set suggested timeout settings for the websocket
ws.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-coro-ssl");
}));
// Accept the websocket handshake
ws.async_accept(yield[ec]);
if (ec)
{
return fail(ec, "accept");
}
for (;;)
{
// This buffer will hold the incoming message
beast::flat_buffer buffer;
// Read a message
ws.async_read(buffer, yield[ec]);
// This indicates that the session was closed
if (ec == websocket::error::closed)
{
break;
}
if (ec)
{
return fail(ec, "read");
}
// Echo the message back
ws.text(ws.got_text());
ws.async_write(buffer.data(), yield[ec]);
if (ec)
{
return fail(ec, "write");
}
}
}
void do_listen(
net::io_context& ioc,
ssl::context& ctx,
tcp::endpoint endpoint,
net::yield_context yield)
{
beast::error_code ec;
// Open the acceptor
tcp::acceptor acceptor(ioc);
acceptor.open(endpoint.protocol(), ec);
if (ec)
{
return fail(ec, "open");
}
// Allow address reuse
acceptor.set_option(net::socket_base::reuse_address(true), ec);
if (ec)
{
return fail(ec, "set_option");
}
// Bind to the server address
acceptor.bind(endpoint, ec);
if (ec)
{
return fail(ec, "bind");
}
// Start listening for connections
acceptor.listen(net::socket_base::max_listen_connections, ec);
if (ec)
{
return fail(ec, "listen");
}
for (;;)
{
tcp::socket socket(ioc);
acceptor.async_accept(socket, yield[ec]);
if (ec)
{
fail(ec, "accept");
}
else
{
boost::asio::spawn(
acceptor.get_executor(),
std::bind(
&do_session,
websocket::stream<beast::ssl_stream<
beast::tcp_stream>>(std::move(socket), ctx),
std::placeholders::_1),
// we ignore the result of the session,
// most errors are handled with error_code
boost::asio::detached);
}
}
}
int main()
{
// TCP local
asio::io_context tcp_ioc(1);
tcp::endpoint tcp_ep(boost::asio::ip::address::from_string("127.0.0.1"), 9898);
co_spawn(tcp_ioc, tcp_listener(tcp_ep), asio::detached);
std::thread t1([&tcp_ioc]() { tcp_ioc.run(); });
cout << "TCP listening on port 8989..." << endl;
// HTTPS WebSocket
net::io_context wss_ioc;
tcp::endpoint wss_ep(boost::asio::ip::address::from_string("127.0.0.1"), 8989);
ssl::context ctx{ssl::context::tlsv12};
load_server_certificate(ctx);
// Spawn a listening port
boost::asio::spawn(wss_ioc,
std::bind(
&do_listen,
std::ref(wss_ioc),
std::ref(ctx),
wss_ep,
std::placeholders::_1),
// on completion, spawn will call this function
[](std::exception_ptr ex)
{
// if an exception occurred in the coroutine,
// it's something critical, e.g. out of memory
// we capture normal errors in the ec
// so we just rethrow the exception here,
// which will cause `ioc.run()` to throw
if (ex)
std::rethrow_exception(ex);
});
std::thread t2([&wss_ioc] { wss_ioc.run(); });
cout << "HTTPS WebSocket listening on port 8989..." << endl;
// Wait for threads
t1.join();
t2.join();
return 0;
}
我想保留一个已连接套接字的全局列表(
using Socket = Defer<tcp::socket>;
),其中每个套接字都有一个唯一的 ID,并且为 WebSocket 流提供一个全局变量。
我将为每个接受的 TCP 连接分配一个线程,并为 WebSocket 分配一个线程。
通过在全局变量中包含套接字和 websocket 流,我可以选择选择哪个套接字进行读取或写入。
谢谢
感觉很像缝合在一起的代码。
deferred_t
,但从来没有使用过该默认值我已将您的 267 行减少为 110 行,而没有本质上改变任何东西:
#include "server_certificate.hpp" // Got this from Boost Beast
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <iostream>
#include <syncstream>
#ifdef _MSC_VER
#pragma comment(lib, "crypt32.lib")
#pragma comment(lib, "libssl.lib")
#pragma comment(lib, "libcrypto.lib")
#endif
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using beast::error_code;
using beast::system_error;
using net::ip::tcp;
static inline auto out() { return std::osyncstream(std::cout); }
static inline void rethrow_handler(std::exception_ptr p) { if (p) std::rethrow_exception(p); }
net::awaitable<void> tcp_listen(tcp::endpoint ep) try {
auto session = [](tcp::socket s) -> net::awaitable<void> {
auto ep = s.remote_endpoint();
out() << "New TCP session for " << ep << std::endl;
net::streambuf buf;
for (error_code ec; !ec;) {
auto token = redirect_error(net::deferred, ec);
auto n = co_await s.async_read_some(buf.prepare(1024), token);
buf.commit(n);
if (buf.size() > 0) {
auto n = co_await async_write(s, buf, token);
buf.consume(n);
}
}
out() << "End of " << ep << " TCP session" << std::endl;
};
auto ex = co_await net::this_coro::executor;
tcp::acceptor acc(ex, ep);
out() << "TCP listening on " << acc.local_endpoint() << std::endl;
for (;;) {
co_spawn(ex, session(co_await acc.async_accept(net::deferred)), net::detached);
}
} catch (system_error const& se) {
out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
}
void wss_listen(tcp::endpoint endpoint, net::yield_context yield) try {
using ws_t = websocket::stream<beast::ssl_stream<beast::tcp_stream>>;
using sb_t = websocket::stream_base;
net::ssl::context ctx{net::ssl::context::tlsv12};
load_server_certificate(ctx);
auto ex = yield.get_executor();
tcp::acceptor acceptor(ex, endpoint);
acceptor.set_option(net::socket_base::reuse_address(true));
out() << "HTTPS WebSocket listening on port 8989..." << std::endl;
for (;;) {
ws_t ws(acceptor.async_accept(make_strand(ex), yield), ctx);
out() << "New WSS session for " << get_lowest_layer(ws).socket().remote_endpoint() << std::endl;
spawn(
yield.get_executor(),
[ws = std::move(ws)](net::yield_context yield) mutable {
try {
ws.next_layer().async_handshake(net::ssl::stream_base::server, yield);
ws.set_option(sb_t::timeout::suggested(beast::role_type::server));
ws.set_option(sb_t::decorator([](websocket::response_type& res) {
res.set(beast::http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-coro-ssl");
}));
// Accept the websocket handshake
for (ws.async_accept(yield);;) {
beast::flat_buffer buffer;
ws.async_read(buffer, yield);
// Echo the message back
ws.text(ws.got_text());
ws.async_write(buffer.data(), yield);
}
} catch (system_error const& se) {
if (se.code() != websocket::error::closed)
out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
}
},
net::detached);
}
} catch (system_error const& se) {
out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
}
int main() {
using namespace std::placeholders;
net::thread_pool ioc(1);
co_spawn(ioc, tcp_listen({{}, 9898}), net::detached); // tcp
net::spawn(ioc, bind(wss_listen, tcp::endpoint{{}, 8989}, _1), rethrow_handler); // wss
ioc.join();
}
有一些布丁证明:
[![在此处输入图像描述][1]][1]
问。 我想保留一个已连接套接字的全局列表[...],其中每个套接字都有一个唯一的 id,并且为 WebSocket 流提供一个全局变量。
为了什么目的?从你的描述看来你肯定不需要它。
问。 我将为每个接受的 TCP 连接分配一个线程,并为 WebSocket 分配一个线程。
再次,根据您的描述,您肯定不需要它。如果不需要的话我建议你不要复杂化。
Q. 通过在全局变量中包含套接字和 websocket 流,我可以选择选择哪个套接字进行读取或写入。
这如何帮助实现您的目标之一?也就是说,目标是什么?当前您有一个同时响应 TCP 和 WSS 连接的服务器。从你的问题开始,我希望你只监听 TCP 连接。
问。 这是实现与一个 Websocket 流的多个 TCP 连接的正确设计吗?感觉不太像:(
确实没有。
问。这就是为什么我希望对这个问题有所了解。
我最好的洞察是,你不知道自己想要/需要什么。想清楚了。
另一边
从那里拿走它。例如,如果您还需要处理/路由响应,您将如何做?正如您在这里猜测的那样,您需要一些多路复用,其中元数据告诉您/WS 服务器哪些响应与哪些请求相关。如果你无法描述你的需求,你永远无法实现它。
澄清这是什么对于会有很大帮助。例如。 WSS服务器是你写的吗?它讲特定的协议吗?哪个版本? TCP 客户端是否期望使用协议等等 [1]: