Boost ASIO 在调用回调函数时崩溃

问题描述 投票:0回答:1

我是 boost 库的新手。我已经使用 boosteastasync 方法编写了一个服务器,但有时它会崩溃。这种情况并不常见,有时需要 3-4 小时才会崩溃。我正在分享从应用程序崩溃转储文件中获得的崩溃点的屏幕截图。

请帮我找出崩溃的原因。

我在谷歌上做了一些研究,还检查了这个链接“https://www.boost.org/doc/libs/1_54_0/boost/asio/detail/deadline_timer_service.hpp”,但无法理解它。

头文件WebSocketServer.h

#include "server_certificate.hpp"
#include "xlogger.h"
#include "TSWebSocket.h"

#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread/mutex.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <map>
#include <queue>

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

//------------------------------------------------------------------------------

// The following ifdef block is the standard way of creating macros which make exporting 
// from a DLL simpler. All files within this DLL are compiled with the WEBSOCKETSERVER_EXPORTS
// symbol defined on the command line. This symbol should not be defined on any project
// that uses this DLL. This way any other project whose source files include this file see 
// WEBSOCKETSERVER_API functions as being imported from a DLL, whereas this DLL sees symbols
// defined with this macro as being exported.
#ifdef WEBSOCKETSERVER_EXPORTS
#define WEBSOCKETSERVER_API __declspec(dllexport)
#else
#define WEBSOCKETSERVER_API __declspec(dllimport)
#endif
class WebSocketSession;
// Accepts incoming connections and launches the sessions
class WEBSOCKETSERVER_API WebSocketListener : public std::enable_shared_from_this<WebSocketListener>
{
private:
    
    std::shared_ptr<ISocketEvents> m_pEvent;
    CRITICAL_SECTION m_hCSClientidSession;
    std::map<UINT, std::shared_ptr<WebSocketSession>> m_mapClientidSession;
public:
    WebSocketListener(tcp::endpoint endpoint);
    ~WebSocketListener();
    void Run();
    void setEventHandler(std::shared_ptr<ISocketEvents> pEvent);
    bool SendMessageToClient(UINT uClientid, std::string strMessage);
    bool RemoveClient(UINT uClientid);
private:
    void do_accept();
    void on_accept(beast::error_code ec, tcp::socket socket);
};

//------------------------------------------------------------------------------

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession>
{
    websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
    beast::flat_buffer m_ReadBuffer;
    beast::flat_buffer m_WriteBuffer;
    std::shared_ptr<ISocketEvents> m_pEvent;
    SOCKET m_SocketDescriptor;
    bool bHandShake;
public:
    // Take ownership of the socket
    WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId);
    void run();
    void closeSocket();
    void on_run();
    void on_handshake(beast::error_code ec);
    void on_accept(beast::error_code ec);
    void on_upgrade(beast::error_code ec, size_t);
    void do_read();
    void on_read(beast::error_code ec, std::size_t bytes_transferred);
    UINT getSocketDescriptor();
    bool SendMessageToClient(std::string strMessage);
    void do_Disconnect();
    ~WebSocketSession();
};

//------------------------------------------------------------------------------

// Factory function that creates instances of the Server Protocol object.
WEBSOCKETSERVER_API std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey);
//------------------------------------------------------------------------------

源文件WebSocketServer.cpp

    #include "stdafx.h"
#include "WebSocketServer.h"
//------------------------------------------------------------------------------

#define NO_OF_THREAD 50

static std::shared_ptr<WebSocketListener> objWebSocketListener = nullptr;

// The io_context is required for all I/O
boost::asio::io_context g_ioc{ NO_OF_THREAD };

// The SSL context is required, and holds certificates
ssl::context g_ctx{ ssl::context::tlsv12 };

tcp::acceptor g_acceptor(net::make_strand(g_ioc));
http::request<http::string_body> g_upgrade_request;

void RunIOContextThread();

//It initialize the WebSocketListener
 std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey)
{
    auto const address = net::ip::make_address("0.0.0.0");
    
    // This holds the self-signed certificate used by the server
    load_server_certificate(g_ctx, certificate, privateKey);

    // Create and launch a listening port
    objWebSocketListener = std::make_shared<WebSocketListener>(tcp::endpoint{ address, port });
    if(objWebSocketListener)
        objWebSocketListener->Run();

    // Run the I/O service on the requested number of threads
    for (int i = 0; i < NO_OF_THREAD; ++i)
    {
        std::thread ioContextThread(&RunIOContextThread);
        ioContextThread.detach();
    }

    return objWebSocketListener->shared_from_this();
}

void RunIOContextThread()
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "RunIOContextThread thread started %d", std::this_thread::get_id());
    g_ioc.run();
}

//------------------------------------------------------------------------------

// Listener class
WebSocketListener::~WebSocketListener()
{
}

WebSocketListener::WebSocketListener(tcp::endpoint endpoint) :
    m_pEvent(NULL)
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Enter");
    InitializeCriticalSection(&m_hCSClientidSession);
    beast::error_code ec;

    // Open the acceptor
    g_acceptor.open(endpoint.protocol(), ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener open failed, error message: %s", ec.message().c_str());
        return;
    }

    // Allow address reuse
    g_acceptor.set_option(net::socket_base::reuse_address(true), ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option failed, error message: %s", ec.message().c_str());
        return;
    }


    g_acceptor.set_option(tcp::no_delay(true), ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option(No delay) failed, error message: %s", ec.message().c_str());
        return;
    }

    // Bind to the server address
    g_acceptor.bind(endpoint, ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener bind failed, error message: %s", ec.message().c_str());
        return;
    }

    // Start listening for connections
    g_acceptor.listen(net::socket_base::max_listen_connections, ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener listen failed, error message: %s", ec.message().c_str());
        return;
    }
    else
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::WebSocketListener Started listening at %s:%hu", g_acceptor.local_endpoint().address().to_string().c_str(), g_acceptor.local_endpoint().port());
    }

    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Leave");
}

bool WebSocketListener::RemoveClient(UINT uClientid) 
{
    bool bRet = false;
    try
    {
        EnterCriticalSection(&m_hCSClientidSession);
        auto iter = m_mapClientidSession.find(uClientid);
        if (iter != m_mapClientidSession.end())
        {
            iter->second->closeSocket();
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::RemoveClient Removed from map uClientid : %d refCount: %d", uClientid, iter->second.use_count());
            m_mapClientidSession.erase(iter);
            bRet = true;
        }
        LeaveCriticalSection(&m_hCSClientidSession);
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Generic Catch");
    }
    return bRet;
}

bool WebSocketListener::SendMessageToClient(UINT uClientid, std::string strMessage)
{
    bool bRet = false;
    try
    {

        EnterCriticalSection(&m_hCSClientidSession);
        auto iter = m_mapClientidSession.find(uClientid);
        if (iter != m_mapClientidSession.end())
        {
            bRet = iter->second->SendMessageToClient(strMessage);
        }
        else
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient ClientID: %d not found", uClientid);
        }
        LeaveCriticalSection(&m_hCSClientidSession);
    }
    catch (exception& e)
    {
        LeaveCriticalSection(&m_hCSClientidSession);
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        LeaveCriticalSection(&m_hCSClientidSession);
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Generic Catch");
    }
    return bRet;
}

void WebSocketListener::setEventHandler(std::shared_ptr<ISocketEvents> pEvent)
{
    m_pEvent = pEvent;
}

// Start accepting incoming connections
void WebSocketListener::Run()
{
    do_accept();
}

void WebSocketListener::do_accept()
{
    try
    {
        // The new connection gets its own strand
        g_acceptor.async_accept(
            net::make_strand(g_ioc),
            beast::bind_front_handler(
                &WebSocketListener::on_accept,
                shared_from_this()));
    }
    catch(exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Catch msg: %s", e.what());
    }
    catch (...) 
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Generic Catch");
    }
}

void WebSocketListener::on_accept(beast::error_code ec, tcp::socket socket)
{
    try
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept accept failed, error SocketID: %d, message: %s", socket.native_handle(), ec.message().c_str());
        }
        else
        {
            socket.set_option(tcp::no_delay(true), ec);
            if (ec)
            {
                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept set_option(No delay) failed, error message: %s", ec.message().c_str());
                return;
            }

            std::string sClientIp = socket.remote_endpoint().address().to_string();
            unsigned short uiClientPort = socket.remote_endpoint().port();
            SOCKET socketId = socket.native_handle();
            // Create the WebSocketSession and run it
            std::shared_ptr<WebSocketSession> objSession = std::make_shared<WebSocketSession>(std::move(socket), g_ctx, m_pEvent, socketId);
            if (objSession != nullptr)
            {
                objSession->run();

                EnterCriticalSection(&m_hCSClientidSession);
                auto itr = m_mapClientidSession.find(socketId);
                if (itr != m_mapClientidSession.end())
                {
                    itr->second = objSession;
                    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept update client id : %d", socketId);
                }
                else
                {
                    m_mapClientidSession.insert(pair<unsigned int, std::shared_ptr<WebSocketSession >>(socketId, objSession));
                    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept insert client id : %d", socketId);
                }
                LeaveCriticalSection(&m_hCSClientidSession);

                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::on_accept Incoming connection request from SocketID: %d, IP: %s Port: %hu", socketId, sClientIp.c_str(), uiClientPort);
            }
        }
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Generic Catch");
    }
    // Accept another connection
    do_accept();
}

//------------------------------------------------------------------------------

// Session class
WebSocketSession::WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId)
    : ws_(std::move(socket), ctx), m_pEvent(pEvent), m_SocketDescriptor(socketId)
{
    m_WriteBuffer.reserve(1000);
    m_ReadBuffer.reserve(1000);
    bHandShake = false;
}

WebSocketSession::~WebSocketSession()
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d Closing ", m_SocketDescriptor);
    if (ws_.is_open())
    {
        beast::websocket::close_reason closeReason;
        beast::error_code ec;
        ws_.close(closeReason, ec);
        if (ec)
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d clsoing error msg: %s", ec.message().c_str());
    }
}

// Get on the correct executor
void WebSocketSession::run()
{
    try {
        // We need to be executing within a strand to perform async operations
        // on the I/O objects in this WebSocketSession. Although not strictly necessary
        // for single-threaded contexts, this example code is written to be
        // thread-safe by default.
        net::dispatch(ws_.get_executor(),
            beast::bind_front_handler(
                &WebSocketSession::on_run,
                shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Generic Catch");
    }
}

void WebSocketSession::closeSocket() 
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket Closing SocketID: %d", m_SocketDescriptor);
    beast::websocket::close_reason closeReason;
    beast::error_code ec; 
    UINT ID = m_SocketDescriptor;
    ws_.async_close(closeReason, [=](beast::error_code ec)
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket SocketID: %d, Code: %d Message: %s", ID, closeReason, ec.message().c_str());
        }
    }
    );
}

// Start the asynchronous operation
void WebSocketSession::on_run()
{
    try
    {
        // Set the timeout.
        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

        // Perform the SSL handshake
        ws_.next_layer().async_handshake(
            ssl::stream_base::server,
            beast::bind_front_handler(
                &WebSocketSession::on_handshake,
                shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Generic Catch");
    }
}

void WebSocketSession::on_handshake(beast::error_code ec)
{
    try
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake handshake failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
            return;
        }
        // Turn off the timeout on the tcp_stream, because
        // the websocket stream has its own timeout system.
        beast::get_lowest_layer(ws_).expires_never();

        // Set suggested timeout settings for the websocket
        ws_.set_option(
            websocket::stream_base::timeout::suggested(
                beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_.set_option(websocket::stream_base::decorator(
            [](websocket::response_type& res)
        {
            res.set(http::field::server,
                std::string(BOOST_BEAST_VERSION_STRING) +
                " websocket-server-async-ssl");
        }));

        http::async_read(ws_.next_layer(), m_ReadBuffer, g_upgrade_request,
            beast::bind_front_handler(
                &WebSocketSession::on_upgrade,
                shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Generic Catch");
    }
}

void WebSocketSession::on_upgrade(beast::error_code ec, size_t)
{
    try
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade upgrade failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
            return;
        }

        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_upgrade Handshake completed for SocketID: %d, WebSocket Headers: %s", m_SocketDescriptor, boost::lexical_cast<std::string>(g_upgrade_request.base()));

        if (m_pEvent != nullptr)
        {
            m_pEvent->OnConnect(m_SocketDescriptor, true, "", 0);
        }
        // Accept the websocket handshake
        ws_.async_accept(
            g_upgrade_request,
            beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Generic Catch");
    }
}

void WebSocketSession::on_accept(beast::error_code ec)
{
    if (ec)
    {
        do_Disconnect();
        return;
    }
    // Read a message
    do_read();
}

void WebSocketSession::do_read()
{
    static boost::mutex objMutex;
    try
    {
        objMutex.lock();
        if (m_pEvent && !bHandShake)
        {
            bHandShake = true;
            m_pEvent->onHandShake(m_SocketDescriptor, bHandShake);
        }
        // Read a message into our buffer
                
        ws_.async_read(
                m_ReadBuffer,
                beast::bind_front_handler(
                    &WebSocketSession::on_read,
                    shared_from_this()));
        objMutex.unlock();
    }
    catch (exception& e)
    {
        objMutex.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        objMutex.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Generic Catch");
    }
}

void WebSocketSession::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
    try
    {
        boost::ignore_unused(bytes_transferred);

        // This indicates that the WebSocketSession was closed
        if (ec == websocket::error::closed)
        {
            do_Disconnect();
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client closed  SocketID: %d", m_SocketDescriptor);
            return;
        }

        if (ec)
        {
            do_Disconnect();
            return;
        }

        if (m_pEvent != nullptr)
        {
            if (m_ReadBuffer.cdata().size() > 0)
            {
                const unsigned int iLen = m_ReadBuffer.cdata().size();
                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client SocketID: %d dataRecv: %d cpacity: %d", m_SocketDescriptor, iLen, m_ReadBuffer.capacity());
                
                std::shared_ptr<char> objData(new char[iLen], std::default_delete<char[]>());
                ::memset(objData.get(), 0, iLen);
                if (objData)
                {
                    memcpy(objData.get(), m_ReadBuffer.cdata().data(), iLen);
                    m_pEvent->OnReceive(m_SocketDescriptor, objData.get(), iLen);
                }
            }
        }
        m_ReadBuffer.consume(bytes_transferred);
    }
    catch (exception& e)
    {
        
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Generic Catch");
    }
    do_read();
}

UINT WebSocketSession::getSocketDescriptor()
{
    return m_SocketDescriptor;
}

void testDelete(char* p) 
{
    delete[] p;
}

struct DeleteChar {
    void operator()(char* p) const {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
            "Deleting the buffer");
        delete[] p;
    }
};

bool WebSocketSession::SendMessageToClient(std::string strMessage) 
{
    bool bRet = false;
    static std::mutex obj;
    try {
        obj.lock();
        
        if (strMessage.length() > 0)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                "WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u msgLen: %d, msg:%s",
                m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity(), strMessage.length(), strMessage.c_str());

            if (m_WriteBuffer.size() > 40960) // Pending data size is greater than 40 MB
            {
                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                    "WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, Disconnecting client due to pending buffer limit crossed",
                    m_SocketDescriptor, m_WriteBuffer.size());
                do_Disconnect();
            }
            else
            {
                boost::beast::ostream(m_WriteBuffer) << strMessage << ":";

                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                    "WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u",
                    m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity());

                ws_.async_write(m_WriteBuffer.data(),
                    [this](beast::error_code ec, std::size_t transfer)
                {
                    boost::ignore_unused(transfer);
                    if (ec)
                    {
                        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                            "WebSocketSession::SendMessageToClient callback error SocketID: %d, ErrorMsg:%s"/* ,Message: %s"*/, m_SocketDescriptor, ec.message().c_str()/*,strMessage.c_str()*/);

                        do_Disconnect();
                    }

                    obj.lock();
                    m_WriteBuffer.consume(transfer);
                    obj.unlock();
                });

                bRet = true;
            }
        }
        obj.unlock();
    }
    catch (exception& e)
    {
        obj.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        obj.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Generic Catch");
    }
    return bRet;
}

void WebSocketSession::do_Disconnect()
{
    try {
        if (m_pEvent)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                "WebSocketSession::do_Disconnect disconnecting SocketID: %d", m_SocketDescriptor);
            m_pEvent->OnConnect(m_SocketDescriptor, false, "", 0);
        }
        else
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                "WebSocketSession::do_Disconnect Removing SocketID: %d", m_SocketDescriptor);
            objWebSocketListener->RemoveClient(m_SocketDescriptor);
        }
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Generic Catch");
    }
}
windows visual-c++ boost crash boost-asio
1个回答
0
投票

我刚刚花了……几个小时来修复你的代码,以便它可以编译。我 记得在发布时也做同样的事情 https://github.com/chriskohlhoff/asio/issues/1335以及您之前的问题。 粗略结果 Live On 科利鲁

代码中的问题数量高得吓人。

  • 插座不是
    UID
    ,
  • 它们在进程的生命周期中并不是唯一的,
  • 本地互斥体不会保护任何东西,
  • 大多数格式说明符都是错误的,
  • 缺少一些格式参数,
  • 有些通过 varargs 传递重要对象来调用 UB,
  • 并发提示不是线程数,
  • 线程疯狂运行(分离),
  • 股实际上应该消除对关键部分的需要,
  • maxThreads
    未使用,
  • 监听器的启动和设置事件处理程序总是存在竞争条件,
  • 所有会话共享
    g_upgrade_request
    变量 - 无论线程如何 - 这只是一场数据竞争,因此 UB;
  • WIN32 API、boost 和标准库线程原语的混合表明所有代码基本上都是从随机源复制粘贴的。

无论如何,你原来的堆栈跟踪(你删除了?)显示计时器完成。这意味着源是

ws_
超时的套接字操作(因为您没有任何其他计时器):

beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

这可能发生在

g_io
实例被破坏之后。这是完全符合预期的,因为所有 IO 线程都是...分离的,因此它们会在
main
退出后继续运行。

因此,请确保您的 main 在执行上下文耗尽工作之前不会退出。更好的是,无论如何不要将其设为全局。


朋友不要让朋友使用全局变量

只需应用基本的示波器卫生来修复所有问题

  • 将大部分内容移至监听器中
  • 将升级请求移至会话中
  • 使用
    thread_pool
    来避免手动线程(也很差)
  • 添加等待
    Stop
  • 删除无用的互斥体以支持股线 - 无论如何你都在尝试使用它们
  • 依靠
  • shared_from_this
    (你……无论如何都在使用)
    将 
    weak_ptr
     存储在您的会话映射中。注意会话图不再
    也需要不安全的密钥
  • 改进和简化的错误处理

住在Coliru

#if 0 // The following ifdef block is the standard way of creating macros which make exporting // from a DLL simpler. All files within this DLL are compiled with the WEBSOCKETSERVER_EXPORTS // symbol defined on the command line. This symbol should not be defined on any project // that uses this DLL. This way any other project whose source files include this file see // WEBSOCKETSERVER_API functions as being imported from a DLL, whereas this DLL sees symbols // defined with this macro as being exported. #ifdef WEBSOCKETSERVER_EXPORTS #define WEBSOCKETSERVER_API __declspec(dllexport) #else #define WEBSOCKETSERVER_API __declspec(dllimport) #endif #include "TSWebSocket.h" #include "server_certificate.hpp" #else #include <cstdarg> #include <cstdio> #define WEBSOCKETSERVER_API #define STDCALL // __stdcall namespace stl::log { enum { LOG_GROUP_ERROR }; enum { LOG_LEVEL_INFO, LOG_LEVEL_ERROR }; [[gnu::format(printf, 3, 4)]] void trace([[maybe_unused]] auto section, [[maybe_unused]] auto level, [[maybe_unused]] char const* fmt, ...) { std::va_list argptr; va_start(argptr, fmt); std::vfprintf(stderr, fmt, argptr); std::puts(""); // poor man's newline va_end(argptr); } } // namespace stl::log #endif #include <algorithm> #include <boost/asio.hpp> #include <boost/beast.hpp> #include <boost/beast/ssl.hpp> #include <boost/lexical_cast.hpp> #include <iostream> #include <list> #include <mutex> #include <string_view> #include <thread> #include <vector> namespace beast = boost::beast; // from <boost/beast.hpp> namespace http = beast::http; // from <boost/beast/http.hpp> namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> namespace net = boost::asio; // from <boost/asio.hpp> namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp> using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> static void load_server_certificate(ssl::context& ctx, std::string, std::string) { // TODO ctx.use_certificate_file("server.pem", ssl::context::pem); ctx.use_private_key_file("server.pem", ssl::context::pem); ctx.set_default_verify_paths(); ctx.set_password_callback([](size_t, auto /*purpose*/) { return "test"; }); } using SOCKET = tcp::socket::native_handle_type; struct ISocketEvents { virtual ~ISocketEvents() = default; virtual void OnConnect(SOCKET, bool, std::string_view, int) { stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__); } virtual void onHandShake(SOCKET, bool) { stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__); } virtual void OnReceive(SOCKET, char const*, size_t) { stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__); } }; //------------------------------------------------------------------------------ class WebSocketSession; // Accepts incoming connections and launches the sessions class WEBSOCKETSERVER_API WebSocketListener : public std::enable_shared_from_this<WebSocketListener> { private: using Handle = std::weak_ptr<WebSocketSession>; std::list<Handle> m_sessions; std::shared_ptr<ISocketEvents> m_pEvent; boost::asio::thread_pool m_ctx; tcp::acceptor m_acceptor{net::make_strand(m_ctx)}; public: // The SSL context is required, and holds certificates ssl::context m_sslctx{ ssl::context::tlsv12}; // TODO make private, probably by moving load_server_certificate WebSocketListener(tcp::endpoint endpoint, size_t num_threads); ~WebSocketListener(); void Run(); void Stop(); void setEventHandler(std::shared_ptr<ISocketEvents> pEvent); bool SendMessageToClient(SOCKET uClientid, std::string strMessage); private: void do_accept(); void on_accept(beast::error_code ec, tcp::socket socket); }; //------------------------------------------------------------------------------ class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> { websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_; beast::flat_buffer m_ReadBuffer; beast::flat_buffer m_WriteBuffer; std::shared_ptr<ISocketEvents> m_pEvent; bool bHandShake = false; http::request<http::string_body> m_upgrade_request; beast::websocket::close_reason m_closeReason{}; public: // Take ownership of the socket WebSocketSession(tcp::socket socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent); ~WebSocketSession(); void run(); void closeSocket(); void on_run(); void on_handshake(beast::error_code ec); void on_accept(beast::error_code ec); void on_upgrade(beast::error_code ec, size_t); void do_read(); void on_read(beast::error_code ec, size_t bytes_transferred); SOCKET getId(); void SendMessage(std::string strMessage); }; //------------------------------------------------------------------------------ // Factory function that creates instances of the Server Protocol object. WEBSOCKETSERVER_API std::shared_ptr<WebSocketListener> STDCALL InitializeWebSocketListener( // unsigned short port, int maxThreads, std::string certificate, std::string privateKey, std::shared_ptr<ISocketEvents> handlers); //-------- // // It initialize the WebSocketListener std::shared_ptr<WebSocketListener> STDCALL InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey, std::shared_ptr<ISocketEvents> handlers) { // Create and launch a listening port auto obj = std::make_shared<WebSocketListener>(tcp::endpoint{{}, port}, maxThreads); obj->setEventHandler(handlers); obj->Run(); load_server_certificate(obj->m_sslctx, certificate, privateKey); return obj; } //------------------------------------------------------------------------------ // Listener class WebSocketListener::~WebSocketListener() {} WebSocketListener::WebSocketListener(tcp::endpoint endpoint, size_t num_threads) : m_ctx(num_threads) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constructor Enter"); beast::error_code ec; // Open the acceptor m_acceptor.open(endpoint.protocol(), ec); if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener open failed, error message: %s", ec.message().c_str()); return; } // Allow address reuse m_acceptor.set_option(net::socket_base::reuse_address(true), ec); if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option failed, error message: %s", ec.message().c_str()); return; } m_acceptor.set_option(tcp::no_delay(true), ec); if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option(No delay) failed, error message: %s", ec.message().c_str()); return; } // Bind to the server address m_acceptor.bind(endpoint, ec); if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener bind failed, error message: %s", ec.message().c_str()); return; } // Start listening for connections m_acceptor.listen(net::socket_base::max_listen_connections, ec); if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener listen failed, error message: %s", ec.message().c_str()); return; } else { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::WebSocketListener Started listening at %s:%hu", m_acceptor.local_endpoint().address().to_string().c_str(), m_acceptor.local_endpoint().port()); } stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constructor Leave"); } bool WebSocketListener::SendMessageToClient(SOCKET uClientid, std::string strMessage) { try { // TODO maybe make `m_sessions` a map again for (Handle const& handle : m_sessions) { if (auto session = handle.lock()) { if (session->getId() != uClientid) continue; session->SendMessage(strMessage); return true; } } stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient ClientID: %d not found", uClientid); } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Generic Catch"); } return false; } void WebSocketListener::setEventHandler(std::shared_ptr<ISocketEvents> pEvent) { net::post(m_acceptor.get_executor(), // post on strand [this, pEvent, self = shared_from_this()] { m_pEvent = pEvent; }); } // Start accepting incoming connections void WebSocketListener::Run() { net::post(m_acceptor.get_executor(), // post on strand [this, self = shared_from_this()] { // do_accept(); }); } void WebSocketListener::Stop() { net::post(m_acceptor.get_executor(), [this, self = shared_from_this()] { m_acceptor.cancel(); for (Handle& handle : m_sessions) if (auto session = handle.lock()) session->closeSocket(); }); m_ctx.join(); // waits for all IO to finish } void WebSocketListener::do_accept() { m_acceptor.async_accept( // net::make_strand(m_ctx), beast::bind_front_handler(&WebSocketListener::on_accept, shared_from_this())); } void WebSocketListener::on_accept(beast::error_code ec, tcp::socket socket) { try { if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept accept failed, error SocketID: %d, message: %s", socket.native_handle(), ec.message().c_str()); } else { // Accept another connection do_accept(); socket.set_option(tcp::no_delay(true), ec); if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept set_option(No delay) failed, error message: %s", ec.message().c_str()); return; } std::string sClientIp = socket.remote_endpoint().address().to_string(); unsigned short uiClientPort = socket.remote_endpoint().port(); // Create the WebSocketSession and run it auto session = std::make_shared<WebSocketSession>(std::move(socket), m_sslctx, m_pEvent); m_sessions.emplace_back(session); stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept insert client id : %d", session->getId()); // optionally: garbage collect expired sessions m_sessions.remove_if(std::mem_fn(&Handle::expired)); stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::on_accept Incoming connection request from SocketID: %d, " "IP: %s Port: %hu", session->getId(), sClientIp.c_str(), uiClientPort); session->run(); } } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Generic Catch"); } } //------------------------------------------------------------------------------ // Session class WebSocketSession::WebSocketSession(tcp::socket socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent) : ws_(std::move(socket), ctx) , m_pEvent(pEvent) // { m_WriteBuffer.reserve(1000); m_ReadBuffer.reserve(1000); } WebSocketSession::~WebSocketSession() try { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d Closing ", getId()); if (ws_.is_open()) { beast::error_code ec; ws_.close(m_closeReason, ec); if (ec) stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d clsoing error msg: %s", getId(), ec.message().c_str()); if (m_pEvent) m_pEvent->OnConnect(getId(), false, "", 0); } } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "~WebSocketSession Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "~WebSocketSession Inside Generic Catch"); } // Get on the correct executor void WebSocketSession::run() { // We need to be executing within a strand to perform async operations // on the I/O objects in this WebSocketSession. Although not strictly necessary // for single-threaded contexts, this example code is written to be // thread-safe by default. net::dispatch(ws_.get_executor(), beast::bind_front_handler(&WebSocketSession::on_run, shared_from_this())); } void WebSocketSession::closeSocket() { auto id = getId(); stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket Closing SocketID: %d", id); auto self = shared_from_this(); //// ideally we cleanly close, however this will not reliably close the pending read // net::post(ws_.get_executor(), [this, self] { get_lowest_layer(ws_).cancel(); }); net::post(ws_.get_executor(), [this, self, id] { // m_closeReason must stay valid! ws_.async_close(m_closeReason, [self, id](beast::error_code ec) { if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket SocketID: %d Message: %s", id, ec.message().c_str()); } }); }); } // Start the asynchronous operation void WebSocketSession::on_run() { // Set the timeout. beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); // Perform the SSL handshake ws_.next_layer().async_handshake( ssl::stream_base::server, beast::bind_front_handler(&WebSocketSession::on_handshake, shared_from_this())); } void WebSocketSession::on_handshake(beast::error_code ec) { try { if (ec) { stl::log::trace( stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake handshake failed, error SocketID: %d, message: %s", getId(), ec.message().c_str()); return; } // Turn off the timeout on the tcp_stream, because // the websocket stream has its own timeout system. beast::get_lowest_layer(ws_).expires_never(); // Set suggested timeout settings for the websocket ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server)); // Set a decorator to change the Server of the handshake ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async-ssl"); })); http::async_read(ws_.next_layer(), m_ReadBuffer, m_upgrade_request, beast::bind_front_handler(&WebSocketSession::on_upgrade, shared_from_this())); } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Generic Catch"); } } void WebSocketSession::on_upgrade(beast::error_code ec, size_t) { try { if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade upgrade failed, error SocketID: %d, message: %s", getId(), ec.message().c_str()); return; } stl::log::trace( stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_upgrade Handshake completed for SocketID: %d, WebSocket Headers: %s", getId(), boost::lexical_cast<std::string>(m_upgrade_request.base()).c_str()); if (m_pEvent) m_pEvent->OnConnect(getId(), true, "", 0); // Accept the websocket handshake ws_.async_accept(m_upgrade_request, beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this())); } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Generic Catch"); } } void WebSocketSession::on_accept(beast::error_code ec) { if (!ec) do_read(); // Read a message } void WebSocketSession::do_read() { try { if (m_pEvent && !bHandShake) { bHandShake = true; m_pEvent->onHandShake(getId(), bHandShake); } // Read a message into our buffer ws_.async_read(m_ReadBuffer, beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this())); } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Generic Catch"); } } void WebSocketSession::on_read(beast::error_code ec, size_t bytes_transferred) { try { boost::ignore_unused(bytes_transferred); // This indicates that the WebSocketSession was closed if (ec) { if (ec == websocket::error::closed) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client closed SocketID: %d", getId()); } return; } if (auto iLen = m_ReadBuffer.cdata().size()) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client SocketID: %d dataRecv: %zu cpacity: %zu", getId(), iLen, m_ReadBuffer.capacity()); std::string data(net::buffer_cast<char const*>(m_ReadBuffer.cdata()), iLen); if (m_pEvent) m_pEvent->OnReceive(getId(), data.data(), iLen); } m_ReadBuffer.consume(bytes_transferred); } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Generic Catch"); } do_read(); } SOCKET WebSocketSession::getId() { return beast::get_lowest_layer(ws_).socket().native_handle(); } void WebSocketSession::SendMessage(std::string strMessage) { net::post(ws_.get_executor(), [this, msg = std::move(strMessage), self=shared_from_this()] { try { if (msg.length() > 0) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, " "BufferCapacity: %zu msgLen:%zud, msg:%s", getId(), m_WriteBuffer.size(), m_WriteBuffer.capacity(), msg.length(), msg.c_str()); if (m_WriteBuffer.size() > 40960) // Pending data size is greater than 40 MB { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, " "Disconnecting client due to pending buffer limit crossed", getId(), m_WriteBuffer.size()); } else { boost::beast::ostream(m_WriteBuffer) << msg << ":"; stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, " "BufferCapacity: %zu", getId(), m_WriteBuffer.size(), m_WriteBuffer.capacity()); ws_.async_write(m_WriteBuffer.data(), [this](beast::error_code ec, size_t transferred) { boost::ignore_unused(transferred); if (ec) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessage callback error SocketID: %d, " "ErrorMsg:%s" /* ,Message: %s"*/, getId(), ec.message().c_str() /*,msg.c_str()*/); } m_WriteBuffer.consume(transferred); }); } } } catch (std::exception const& e) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessage Inside Catch msg: %s", e.what()); } catch (...) { stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessage Inside Generic Catch"); } }); } int main() { auto handlers = std::make_shared<ISocketEvents>(); auto wsl = InitializeWebSocketListener(8989, 50, "CERT", "KEY", handlers); std::this_thread::sleep_for(std::chrono::seconds(10)); wsl->Stop(); }
本地演示:

总结、注意事项

总而言之,要更加了解生命周期(请参阅

m_closeReason

 了解更微妙的生命周期)和关闭。

剩余问题:

  • closeSocket()

    部分取决于对等方根据Websocket规范积极配合。您可能想要扩展逻辑以最终强制关闭套接字

  • 您需要更多围绕

    SendMessage

    的安全 - 目前完全有可能发出重叠写入。这……不符合规格。您可以通过在其中放置一个传出消息队列来解决这个问题

© www.soinside.com 2019 - 2024. All rights reserved.