设计 TCP 到 WebSocket

问题描述 投票:0回答:1

我计划拥有 TCP 到 WebSocket 隧道。多个 TCP 连接到一个 WebSocket 连接(我知道这需要多路复用,但一次只做一件事)。这意味着有一个 TCP 套接字在本地侦听并通过 WebSocket 转发从 TCP 套接字接收到的所有内容。

最近在学习Boost Asio和Boost Beast,到目前为止写了以下代码。它还远未完成,但由于我对协程和 Boost 还很陌生,所以我有一些设计问题,

#include "server_certificate.hpp"  // Got this from Boost Beast
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/spawn.hpp>

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/json/src.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <memory>
#include <time.h>

#pragma comment(lib, "crypt32.lib")
#pragma comment(lib, "libssl.lib")
#pragma comment(lib, "libcrypto.lib")

using namespace std;
namespace json = boost::json;
namespace asio = boost::asio;
namespace this_coro = asio::this_coro;
using asio::ip::tcp;
using boost::system::error_code;
using namespace asio::experimental::awaitable_operators;

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

template <typename T> using Defer = asio::deferred_t::as_default_on_t<T>;
using Socket = Defer<tcp::socket>;
using Acceptor = Defer<tcp::acceptor>;



void
fail(beast::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}



/*  TCP listening locally   */
asio::awaitable<void> tcp_session(Socket socket)
{
    auto ep = socket.remote_endpoint();
    std::cout << "New TCP session for " << ep << "\n";
    asio::streambuf buf;

    size_t requests_handled = 0;
    for (boost::system::error_code ec; !ec;)
    {
        auto token = redirect_error(asio::deferred, ec);

        auto n = co_await socket.async_read_some(buf.prepare(1024), token);
        buf.commit(n);

        auto eof = ec == asio::error::eof;
        if (buf.size() > 0)
        {
            requests_handled += 1;
            auto n = co_await async_write(socket, buf, token);
            buf.consume(n);
        }
    }

    std::cout << "End of " << ep << " TCP session" << endl;
}


asio::awaitable<void> tcp_listener(tcp::endpoint& ep)
{
    auto ex = co_await this_coro::executor;
    Acceptor acc(ex, ep);
    for (;;)
    {
        co_spawn(ex, tcp_session(co_await acc.async_accept()), asio::detached);
    }
}



/*  HTTPS WebSocket listening on Internet   */
// Echoes back all received WebSocket messages
void do_session(
    websocket::stream<beast::ssl_stream<beast::tcp_stream>>&ws,
    net::yield_context yield)
{
    beast::error_code ec;

    // Set the timeout.
    beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));

    // Perform the SSL handshake
    ws.next_layer().async_handshake(ssl::stream_base::server, yield[ec]);
    if (ec)
    {
        return fail(ec, "handshake");
    }

    // Turn off the timeout on the tcp_stream, because
    // the websocket stream has its own timeout system.
    beast::get_lowest_layer(ws).expires_never();

    // Set suggested timeout settings for the websocket
    ws.set_option(
        websocket::stream_base::timeout::suggested(
            beast::role_type::server));

    // Set a decorator to change the Server of the handshake
    ws.set_option(websocket::stream_base::decorator(
        [](websocket::response_type& res)
        {
            res.set(http::field::server,
            std::string(BOOST_BEAST_VERSION_STRING) +
            " websocket-server-coro-ssl");
        }));

    // Accept the websocket handshake
    ws.async_accept(yield[ec]);
    if (ec)
    {
        return fail(ec, "accept");
    }

    for (;;)
    {
        // This buffer will hold the incoming message
        beast::flat_buffer buffer;

        // Read a message
        ws.async_read(buffer, yield[ec]);

        // This indicates that the session was closed
        if (ec == websocket::error::closed)
        {
            break;
        }

        if (ec)
        {
            return fail(ec, "read");
        }

        // Echo the message back
        ws.text(ws.got_text());
        ws.async_write(buffer.data(), yield[ec]);
        if (ec)
        {
            return fail(ec, "write");
        }
    }
}

void do_listen(
    net::io_context& ioc,
    ssl::context& ctx,
    tcp::endpoint endpoint,
    net::yield_context yield)
{
    beast::error_code ec;

    // Open the acceptor
    tcp::acceptor acceptor(ioc);
    acceptor.open(endpoint.protocol(), ec);
    if (ec)
    {
        return fail(ec, "open");
    }

    // Allow address reuse
    acceptor.set_option(net::socket_base::reuse_address(true), ec);
    if (ec)
    {
        return fail(ec, "set_option");
    }

    // Bind to the server address
    acceptor.bind(endpoint, ec);
    if (ec)
    {
        return fail(ec, "bind");
    }

    // Start listening for connections
    acceptor.listen(net::socket_base::max_listen_connections, ec);
    if (ec)
    {
        return fail(ec, "listen");
    }

    for (;;)
    {
        tcp::socket socket(ioc);
        acceptor.async_accept(socket, yield[ec]);
        if (ec)
        {
            fail(ec, "accept");
        }
        else
        {
            boost::asio::spawn(
                acceptor.get_executor(),
                std::bind(
                    &do_session,
                    websocket::stream<beast::ssl_stream<
                    beast::tcp_stream>>(std::move(socket), ctx),
                    std::placeholders::_1),
                // we ignore the result of the session,
                // most errors are handled with error_code
                boost::asio::detached);
        }
    }
}



int main()
{
    // TCP local
    asio::io_context tcp_ioc(1);
    tcp::endpoint tcp_ep(boost::asio::ip::address::from_string("127.0.0.1"), 9898);
    co_spawn(tcp_ioc, tcp_listener(tcp_ep), asio::detached);
    std::thread t1([&tcp_ioc]() {  tcp_ioc.run(); });
    cout << "TCP listening on port 8989..." << endl;

    // HTTPS WebSocket
    net::io_context wss_ioc;
    tcp::endpoint wss_ep(boost::asio::ip::address::from_string("127.0.0.1"), 8989);
    ssl::context ctx{ssl::context::tlsv12};
    load_server_certificate(ctx);
    // Spawn a listening port
    boost::asio::spawn(wss_ioc,
        std::bind(
            &do_listen,
            std::ref(wss_ioc),
            std::ref(ctx),
            wss_ep,
            std::placeholders::_1),
        // on completion, spawn will call this function
        [](std::exception_ptr ex)
        {
            // if an exception occurred in the coroutine,
            // it's something critical, e.g. out of memory
            // we capture normal errors in the ec
            // so we just rethrow the exception here,
            // which will cause `ioc.run()` to throw
            if (ex)
                std::rethrow_exception(ex);
        });
    std::thread t2([&wss_ioc] { wss_ioc.run(); });
    cout << "HTTPS WebSocket listening on port 8989..." << endl;

    // Wait for threads
    t1.join();
    t2.join();

    return 0;
}

我想保留一个已连接套接字的全局列表(

using Socket = Defer<tcp::socket>;
),其中每个套接字都有一个唯一的 ID,并且为 WebSocket 流提供一个全局变量。

我将为每个接受的 TCP 连接分配一个线程,并为 WebSocket 分配一个线程。

通过在全局变量中包含套接字和 websocket 流,我可以选择选择哪个套接字进行读取或写入。

  • 这是实现与一个 Websocket 流的多个 TCP 连接的正确设计吗?感觉不太像:(,这就是为什么我希望对这个问题有所了解。

谢谢

c++ websocket boost boost-asio boost-beast
1个回答
0
投票

感觉很像缝合在一起的代码。

  • 奇怪的是,你在某些 IO 对象上竭尽全力使用默认值
    deferred_t
    ,但从来没有使用过该默认值
  • TCP 服务器采用 C++20 协程似乎是任意的,但 WSS 版本采用了严格的堆栈协程实现,该实现依赖于具有平台限制的多个库
  • 最重要的是,您通过引用获取协程参数。这是......一个真正的设计问题,因为它们是异步运行的

我已将您的 267 行减少为 110 行,而没有本质上改变任何东西:

住在Coliru

#include "server_certificate.hpp" // Got this from Boost Beast
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <iostream>
#include <syncstream>
#ifdef _MSC_VER
    #pragma comment(lib, "crypt32.lib")
    #pragma comment(lib, "libssl.lib")
    #pragma comment(lib, "libcrypto.lib")
#endif

namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using beast::error_code;
using beast::system_error;
using net::ip::tcp;

static inline auto out()                                 { return std::osyncstream(std::cout); } 
static inline void rethrow_handler(std::exception_ptr p) { if (p) std::rethrow_exception(p);   } 

net::awaitable<void> tcp_listen(tcp::endpoint ep) try {
    auto session = [](tcp::socket s) -> net::awaitable<void> {
        auto ep = s.remote_endpoint();
        out() << "New TCP session for " << ep << std::endl;
        net::streambuf buf;

        for (error_code ec; !ec;) {
            auto token = redirect_error(net::deferred, ec);

            auto n = co_await s.async_read_some(buf.prepare(1024), token);
            buf.commit(n);

            if (buf.size() > 0) {
                auto n = co_await async_write(s, buf, token);
                buf.consume(n);
            }
        }

        out() << "End of " << ep << " TCP session" << std::endl;
    };

    auto          ex = co_await net::this_coro::executor;
    tcp::acceptor acc(ex, ep);
    out() << "TCP listening on " << acc.local_endpoint() << std::endl;
    for (;;) {
        co_spawn(ex, session(co_await acc.async_accept(net::deferred)), net::detached);
    }
} catch (system_error const& se) {
    out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
}

void wss_listen(tcp::endpoint endpoint, net::yield_context yield) try {
    using ws_t = websocket::stream<beast::ssl_stream<beast::tcp_stream>>;
    using sb_t = websocket::stream_base;
    net::ssl::context ctx{net::ssl::context::tlsv12};
    load_server_certificate(ctx);

    auto          ex = yield.get_executor();
    tcp::acceptor acceptor(ex, endpoint);
    acceptor.set_option(net::socket_base::reuse_address(true));
    out() << "HTTPS WebSocket listening on port 8989..." << std::endl;

    for (;;) {
        ws_t ws(acceptor.async_accept(make_strand(ex), yield), ctx);
        out() << "New WSS session for " << get_lowest_layer(ws).socket().remote_endpoint() << std::endl;

        spawn(
            yield.get_executor(),
            [ws = std::move(ws)](net::yield_context yield) mutable {
                try {
                    ws.next_layer().async_handshake(net::ssl::stream_base::server, yield);
                    ws.set_option(sb_t::timeout::suggested(beast::role_type::server));
                    ws.set_option(sb_t::decorator([](websocket::response_type& res) {
                        res.set(beast::http::field::server,
                                std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-coro-ssl");
                    }));

                    // Accept the websocket handshake
                    for (ws.async_accept(yield);;) {
                        beast::flat_buffer buffer;
                        ws.async_read(buffer, yield);

                        // Echo the message back
                        ws.text(ws.got_text());
                        ws.async_write(buffer.data(), yield);
                    }
                } catch (system_error const& se) {
                    if (se.code() != websocket::error::closed)
                        out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
                }
            },
            net::detached);
    }
} catch (system_error const& se) {
    out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
}

int main() {
    using namespace std::placeholders;
    net::thread_pool ioc(1);

    co_spawn(ioc, tcp_listen({{}, 9898}), net::detached);                            // tcp
    net::spawn(ioc, bind(wss_listen, tcp::endpoint{{}, 8989}, _1), rethrow_handler); // wss

    ioc.join();
}

有一些布丁证明:

[![在此处输入图像描述][1]][1]

问题

问。 我想保留一个已连接套接字的全局列表[...],其中每个套接字都有一个唯一的 id,并且为 WebSocket 流提供一个全局变量。

为了什么目的?从你的描述看来你肯定不需要它。

问。 我将为每个接受的 TCP 连接分配一个线程,并为 WebSocket 分配一个线程。

再次,根据您的描述,您肯定不需要它。如果不需要的话我建议你不要复杂化。

Q. 通过在全局变量中包含套接字和 websocket 流,我可以选择选择哪个套接字进行读取或写入。

这如何帮助实现您的目标之一?也就是说,目标是什么?当前您有一个同时响应 TCP 和 WSS 连接的服务器。从你的问题开始,我希望你只监听 TCP 连接。

问。 这是实现与一个 Websocket 流的多个 TCP 连接的正确设计吗?感觉不太像:(

确实没有。

问。这就是为什么我希望对这个问题有所了解。

我最好的洞察是,你不知道自己想要/需要什么。想清楚了。

  • 服务器接受 TCP 连接,启动会话
  • 会话收到一条“消息”(什么是消息?考虑一下框架、大小限制、超时)
  • 会话将消息推送到队列中以中继到 WS 服务器

另一边

  • 服务器(作为客户端)连接到 WS 服务器
  • 客户端监视一个队列,其中的消息将被发送到 WS 服务器。

从那里拿走它。例如,如果您还需要处理/路由响应,您将如何做?正如您在这里猜测的那样,您需要一些多路复用,其中元数据告诉您/WS 服务器哪些响应与哪些请求相关。如果你无法描述你的需求,你永远无法实现它。

澄清这是什么对于会有很大帮助。例如。 WSS服务器是你写的吗?它讲特定的协议吗?哪个版本? TCP 客户端是否期望使用协议等等 [1]:https://i.stack.imgur.com/GL07p.gif

© www.soinside.com 2019 - 2024. All rights reserved.