我是 boost 库的新手。我已经使用 boosteastasync 方法编写了一个服务器,但有时它会崩溃。这种情况并不常见,有时需要 3-4 小时才会崩溃。我正在分享从应用程序崩溃转储文件中获得的崩溃点的屏幕截图。
请帮我找出崩溃的原因。
我在谷歌上做了一些研究,还检查了这个链接“https://www.boost.org/doc/libs/1_54_0/boost/asio/detail/deadline_timer_service.hpp”,但无法理解它。
头文件WebSocketServer.h
#include "server_certificate.hpp"
#include "xlogger.h"
#include "TSWebSocket.h"
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread/mutex.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <map>
#include <queue>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
// The following ifdef block is the standard way of creating macros which make exporting
// from a DLL simpler. All files within this DLL are compiled with the WEBSOCKETSERVER_EXPORTS
// symbol defined on the command line. This symbol should not be defined on any project
// that uses this DLL. This way any other project whose source files include this file see
// WEBSOCKETSERVER_API functions as being imported from a DLL, whereas this DLL sees symbols
// defined with this macro as being exported.
#ifdef WEBSOCKETSERVER_EXPORTS
#define WEBSOCKETSERVER_API __declspec(dllexport)
#else
#define WEBSOCKETSERVER_API __declspec(dllimport)
#endif
class WebSocketSession;
// Accepts incoming connections and launches the sessions
class WEBSOCKETSERVER_API WebSocketListener : public std::enable_shared_from_this<WebSocketListener>
{
private:
std::shared_ptr<ISocketEvents> m_pEvent;
CRITICAL_SECTION m_hCSClientidSession;
std::map<UINT, std::shared_ptr<WebSocketSession>> m_mapClientidSession;
public:
WebSocketListener(tcp::endpoint endpoint);
~WebSocketListener();
void Run();
void setEventHandler(std::shared_ptr<ISocketEvents> pEvent);
bool SendMessageToClient(UINT uClientid, std::string strMessage);
bool RemoveClient(UINT uClientid);
private:
void do_accept();
void on_accept(beast::error_code ec, tcp::socket socket);
};
//------------------------------------------------------------------------------
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession>
{
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
beast::flat_buffer m_ReadBuffer;
beast::flat_buffer m_WriteBuffer;
std::shared_ptr<ISocketEvents> m_pEvent;
SOCKET m_SocketDescriptor;
bool bHandShake;
public:
// Take ownership of the socket
WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId);
void run();
void closeSocket();
void on_run();
void on_handshake(beast::error_code ec);
void on_accept(beast::error_code ec);
void on_upgrade(beast::error_code ec, size_t);
void do_read();
void on_read(beast::error_code ec, std::size_t bytes_transferred);
UINT getSocketDescriptor();
bool SendMessageToClient(std::string strMessage);
void do_Disconnect();
~WebSocketSession();
};
//------------------------------------------------------------------------------
// Factory function that creates instances of the Server Protocol object.
WEBSOCKETSERVER_API std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey);
//------------------------------------------------------------------------------
源文件WebSocketServer.cpp
#include "stdafx.h"
#include "WebSocketServer.h"
//------------------------------------------------------------------------------
#define NO_OF_THREAD 50
static std::shared_ptr<WebSocketListener> objWebSocketListener = nullptr;
// The io_context is required for all I/O
boost::asio::io_context g_ioc{ NO_OF_THREAD };
// The SSL context is required, and holds certificates
ssl::context g_ctx{ ssl::context::tlsv12 };
tcp::acceptor g_acceptor(net::make_strand(g_ioc));
http::request<http::string_body> g_upgrade_request;
void RunIOContextThread();
//It initialize the WebSocketListener
std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey)
{
auto const address = net::ip::make_address("0.0.0.0");
// This holds the self-signed certificate used by the server
load_server_certificate(g_ctx, certificate, privateKey);
// Create and launch a listening port
objWebSocketListener = std::make_shared<WebSocketListener>(tcp::endpoint{ address, port });
if(objWebSocketListener)
objWebSocketListener->Run();
// Run the I/O service on the requested number of threads
for (int i = 0; i < NO_OF_THREAD; ++i)
{
std::thread ioContextThread(&RunIOContextThread);
ioContextThread.detach();
}
return objWebSocketListener->shared_from_this();
}
void RunIOContextThread()
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "RunIOContextThread thread started %d", std::this_thread::get_id());
g_ioc.run();
}
//------------------------------------------------------------------------------
// Listener class
WebSocketListener::~WebSocketListener()
{
}
WebSocketListener::WebSocketListener(tcp::endpoint endpoint) :
m_pEvent(NULL)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Enter");
InitializeCriticalSection(&m_hCSClientidSession);
beast::error_code ec;
// Open the acceptor
g_acceptor.open(endpoint.protocol(), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener open failed, error message: %s", ec.message().c_str());
return;
}
// Allow address reuse
g_acceptor.set_option(net::socket_base::reuse_address(true), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option failed, error message: %s", ec.message().c_str());
return;
}
g_acceptor.set_option(tcp::no_delay(true), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option(No delay) failed, error message: %s", ec.message().c_str());
return;
}
// Bind to the server address
g_acceptor.bind(endpoint, ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener bind failed, error message: %s", ec.message().c_str());
return;
}
// Start listening for connections
g_acceptor.listen(net::socket_base::max_listen_connections, ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener listen failed, error message: %s", ec.message().c_str());
return;
}
else
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::WebSocketListener Started listening at %s:%hu", g_acceptor.local_endpoint().address().to_string().c_str(), g_acceptor.local_endpoint().port());
}
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Leave");
}
bool WebSocketListener::RemoveClient(UINT uClientid)
{
bool bRet = false;
try
{
EnterCriticalSection(&m_hCSClientidSession);
auto iter = m_mapClientidSession.find(uClientid);
if (iter != m_mapClientidSession.end())
{
iter->second->closeSocket();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::RemoveClient Removed from map uClientid : %d refCount: %d", uClientid, iter->second.use_count());
m_mapClientidSession.erase(iter);
bRet = true;
}
LeaveCriticalSection(&m_hCSClientidSession);
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Generic Catch");
}
return bRet;
}
bool WebSocketListener::SendMessageToClient(UINT uClientid, std::string strMessage)
{
bool bRet = false;
try
{
EnterCriticalSection(&m_hCSClientidSession);
auto iter = m_mapClientidSession.find(uClientid);
if (iter != m_mapClientidSession.end())
{
bRet = iter->second->SendMessageToClient(strMessage);
}
else
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient ClientID: %d not found", uClientid);
}
LeaveCriticalSection(&m_hCSClientidSession);
}
catch (exception& e)
{
LeaveCriticalSection(&m_hCSClientidSession);
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Catch msg: %s", e.what());
}
catch (...)
{
LeaveCriticalSection(&m_hCSClientidSession);
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Generic Catch");
}
return bRet;
}
void WebSocketListener::setEventHandler(std::shared_ptr<ISocketEvents> pEvent)
{
m_pEvent = pEvent;
}
// Start accepting incoming connections
void WebSocketListener::Run()
{
do_accept();
}
void WebSocketListener::do_accept()
{
try
{
// The new connection gets its own strand
g_acceptor.async_accept(
net::make_strand(g_ioc),
beast::bind_front_handler(
&WebSocketListener::on_accept,
shared_from_this()));
}
catch(exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Generic Catch");
}
}
void WebSocketListener::on_accept(beast::error_code ec, tcp::socket socket)
{
try
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept accept failed, error SocketID: %d, message: %s", socket.native_handle(), ec.message().c_str());
}
else
{
socket.set_option(tcp::no_delay(true), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept set_option(No delay) failed, error message: %s", ec.message().c_str());
return;
}
std::string sClientIp = socket.remote_endpoint().address().to_string();
unsigned short uiClientPort = socket.remote_endpoint().port();
SOCKET socketId = socket.native_handle();
// Create the WebSocketSession and run it
std::shared_ptr<WebSocketSession> objSession = std::make_shared<WebSocketSession>(std::move(socket), g_ctx, m_pEvent, socketId);
if (objSession != nullptr)
{
objSession->run();
EnterCriticalSection(&m_hCSClientidSession);
auto itr = m_mapClientidSession.find(socketId);
if (itr != m_mapClientidSession.end())
{
itr->second = objSession;
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept update client id : %d", socketId);
}
else
{
m_mapClientidSession.insert(pair<unsigned int, std::shared_ptr<WebSocketSession >>(socketId, objSession));
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept insert client id : %d", socketId);
}
LeaveCriticalSection(&m_hCSClientidSession);
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::on_accept Incoming connection request from SocketID: %d, IP: %s Port: %hu", socketId, sClientIp.c_str(), uiClientPort);
}
}
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Generic Catch");
}
// Accept another connection
do_accept();
}
//------------------------------------------------------------------------------
// Session class
WebSocketSession::WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId)
: ws_(std::move(socket), ctx), m_pEvent(pEvent), m_SocketDescriptor(socketId)
{
m_WriteBuffer.reserve(1000);
m_ReadBuffer.reserve(1000);
bHandShake = false;
}
WebSocketSession::~WebSocketSession()
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d Closing ", m_SocketDescriptor);
if (ws_.is_open())
{
beast::websocket::close_reason closeReason;
beast::error_code ec;
ws_.close(closeReason, ec);
if (ec)
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d clsoing error msg: %s", ec.message().c_str());
}
}
// Get on the correct executor
void WebSocketSession::run()
{
try {
// We need to be executing within a strand to perform async operations
// on the I/O objects in this WebSocketSession. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(ws_.get_executor(),
beast::bind_front_handler(
&WebSocketSession::on_run,
shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Generic Catch");
}
}
void WebSocketSession::closeSocket()
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket Closing SocketID: %d", m_SocketDescriptor);
beast::websocket::close_reason closeReason;
beast::error_code ec;
UINT ID = m_SocketDescriptor;
ws_.async_close(closeReason, [=](beast::error_code ec)
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket SocketID: %d, Code: %d Message: %s", ID, closeReason, ec.message().c_str());
}
}
);
}
// Start the asynchronous operation
void WebSocketSession::on_run()
{
try
{
// Set the timeout.
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
ws_.next_layer().async_handshake(
ssl::stream_base::server,
beast::bind_front_handler(
&WebSocketSession::on_handshake,
shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Generic Catch");
}
}
void WebSocketSession::on_handshake(beast::error_code ec)
{
try
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake handshake failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
return;
}
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async-ssl");
}));
http::async_read(ws_.next_layer(), m_ReadBuffer, g_upgrade_request,
beast::bind_front_handler(
&WebSocketSession::on_upgrade,
shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Generic Catch");
}
}
void WebSocketSession::on_upgrade(beast::error_code ec, size_t)
{
try
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade upgrade failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
return;
}
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_upgrade Handshake completed for SocketID: %d, WebSocket Headers: %s", m_SocketDescriptor, boost::lexical_cast<std::string>(g_upgrade_request.base()));
if (m_pEvent != nullptr)
{
m_pEvent->OnConnect(m_SocketDescriptor, true, "", 0);
}
// Accept the websocket handshake
ws_.async_accept(
g_upgrade_request,
beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Generic Catch");
}
}
void WebSocketSession::on_accept(beast::error_code ec)
{
if (ec)
{
do_Disconnect();
return;
}
// Read a message
do_read();
}
void WebSocketSession::do_read()
{
static boost::mutex objMutex;
try
{
objMutex.lock();
if (m_pEvent && !bHandShake)
{
bHandShake = true;
m_pEvent->onHandShake(m_SocketDescriptor, bHandShake);
}
// Read a message into our buffer
ws_.async_read(
m_ReadBuffer,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()));
objMutex.unlock();
}
catch (exception& e)
{
objMutex.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Catch msg: %s", e.what());
}
catch (...)
{
objMutex.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Generic Catch");
}
}
void WebSocketSession::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
try
{
boost::ignore_unused(bytes_transferred);
// This indicates that the WebSocketSession was closed
if (ec == websocket::error::closed)
{
do_Disconnect();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client closed SocketID: %d", m_SocketDescriptor);
return;
}
if (ec)
{
do_Disconnect();
return;
}
if (m_pEvent != nullptr)
{
if (m_ReadBuffer.cdata().size() > 0)
{
const unsigned int iLen = m_ReadBuffer.cdata().size();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client SocketID: %d dataRecv: %d cpacity: %d", m_SocketDescriptor, iLen, m_ReadBuffer.capacity());
std::shared_ptr<char> objData(new char[iLen], std::default_delete<char[]>());
::memset(objData.get(), 0, iLen);
if (objData)
{
memcpy(objData.get(), m_ReadBuffer.cdata().data(), iLen);
m_pEvent->OnReceive(m_SocketDescriptor, objData.get(), iLen);
}
}
}
m_ReadBuffer.consume(bytes_transferred);
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Generic Catch");
}
do_read();
}
UINT WebSocketSession::getSocketDescriptor()
{
return m_SocketDescriptor;
}
void testDelete(char* p)
{
delete[] p;
}
struct DeleteChar {
void operator()(char* p) const {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"Deleting the buffer");
delete[] p;
}
};
bool WebSocketSession::SendMessageToClient(std::string strMessage)
{
bool bRet = false;
static std::mutex obj;
try {
obj.lock();
if (strMessage.length() > 0)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u msgLen: %d, msg:%s",
m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity(), strMessage.length(), strMessage.c_str());
if (m_WriteBuffer.size() > 40960) // Pending data size is greater than 40 MB
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, Disconnecting client due to pending buffer limit crossed",
m_SocketDescriptor, m_WriteBuffer.size());
do_Disconnect();
}
else
{
boost::beast::ostream(m_WriteBuffer) << strMessage << ":";
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u",
m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity());
ws_.async_write(m_WriteBuffer.data(),
[this](beast::error_code ec, std::size_t transfer)
{
boost::ignore_unused(transfer);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient callback error SocketID: %d, ErrorMsg:%s"/* ,Message: %s"*/, m_SocketDescriptor, ec.message().c_str()/*,strMessage.c_str()*/);
do_Disconnect();
}
obj.lock();
m_WriteBuffer.consume(transfer);
obj.unlock();
});
bRet = true;
}
}
obj.unlock();
}
catch (exception& e)
{
obj.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Catch msg: %s", e.what());
}
catch (...)
{
obj.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Generic Catch");
}
return bRet;
}
void WebSocketSession::do_Disconnect()
{
try {
if (m_pEvent)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_Disconnect disconnecting SocketID: %d", m_SocketDescriptor);
m_pEvent->OnConnect(m_SocketDescriptor, false, "", 0);
}
else
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_Disconnect Removing SocketID: %d", m_SocketDescriptor);
objWebSocketListener->RemoveClient(m_SocketDescriptor);
}
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Generic Catch");
}
}
我刚刚花了……几个小时来修复你的代码,以便它可以编译。我 记得在发布时也做同样的事情 https://github.com/chriskohlhoff/asio/issues/1335以及您之前的问题。 粗略结果 Live On 科利鲁
代码中的问题数量高得吓人。
UID
,maxThreads
未使用,g_upgrade_request
变量 - 无论线程如何 - 这只是一场数据竞争,因此 UB;无论如何,你原来的堆栈跟踪(你删除了?)显示计时器完成。这意味着源是
ws_
超时的套接字操作(因为您没有任何其他计时器):
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
这可能发生在
g_io
实例被破坏之后。这是完全符合预期的,因为所有 IO 线程都是...分离的,因此它们会在 main
退出后继续运行。
因此,请确保您的 main 在执行上下文耗尽工作之前不会退出。更好的是,无论如何不要将其设为全局。
只需应用基本的示波器卫生来修复所有问题
thread_pool
来避免手动线程(也很差)Stop
shared_from_this
(你……无论如何都在使用) 将
weak_ptr
存储在您的会话映射中。注意会话图不再 也需要不安全的密钥
#if 0
// The following ifdef block is the standard way of creating macros which make exporting
// from a DLL simpler. All files within this DLL are compiled with the WEBSOCKETSERVER_EXPORTS
// symbol defined on the command line. This symbol should not be defined on any project
// that uses this DLL. This way any other project whose source files include this file see
// WEBSOCKETSERVER_API functions as being imported from a DLL, whereas this DLL sees symbols
// defined with this macro as being exported.
#ifdef WEBSOCKETSERVER_EXPORTS
#define WEBSOCKETSERVER_API __declspec(dllexport)
#else
#define WEBSOCKETSERVER_API __declspec(dllimport)
#endif
#include "TSWebSocket.h"
#include "server_certificate.hpp"
#else
#include <cstdarg>
#include <cstdio>
#define WEBSOCKETSERVER_API
#define STDCALL // __stdcall
namespace stl::log {
enum { LOG_GROUP_ERROR };
enum { LOG_LEVEL_INFO, LOG_LEVEL_ERROR };
[[gnu::format(printf, 3, 4)]] void trace([[maybe_unused]] auto section, [[maybe_unused]] auto level,
[[maybe_unused]] char const* fmt, ...) {
std::va_list argptr;
va_start(argptr, fmt);
std::vfprintf(stderr, fmt, argptr);
std::puts(""); // poor man's newline
va_end(argptr);
}
} // namespace stl::log
#endif
#include <algorithm>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <list>
#include <mutex>
#include <string_view>
#include <thread>
#include <vector>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
static void load_server_certificate(ssl::context& ctx, std::string, std::string) {
// TODO
ctx.use_certificate_file("server.pem", ssl::context::pem);
ctx.use_private_key_file("server.pem", ssl::context::pem);
ctx.set_default_verify_paths();
ctx.set_password_callback([](size_t, auto /*purpose*/) { return "test"; });
}
using SOCKET = tcp::socket::native_handle_type;
struct ISocketEvents {
virtual ~ISocketEvents() = default;
virtual void OnConnect(SOCKET, bool, std::string_view, int) {
stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__);
}
virtual void onHandShake(SOCKET, bool) {
stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__);
}
virtual void OnReceive(SOCKET, char const*, size_t) {
stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__);
}
};
//------------------------------------------------------------------------------
class WebSocketSession;
// Accepts incoming connections and launches the sessions
class WEBSOCKETSERVER_API WebSocketListener : public std::enable_shared_from_this<WebSocketListener> {
private:
using Handle = std::weak_ptr<WebSocketSession>;
std::list<Handle> m_sessions;
std::shared_ptr<ISocketEvents> m_pEvent;
boost::asio::thread_pool m_ctx;
tcp::acceptor m_acceptor{net::make_strand(m_ctx)};
public:
// The SSL context is required, and holds certificates
ssl::context m_sslctx{
ssl::context::tlsv12}; // TODO make private, probably by moving load_server_certificate
WebSocketListener(tcp::endpoint endpoint, size_t num_threads);
~WebSocketListener();
void Run();
void Stop();
void setEventHandler(std::shared_ptr<ISocketEvents> pEvent);
bool SendMessageToClient(SOCKET uClientid, std::string strMessage);
private:
void do_accept();
void on_accept(beast::error_code ec, tcp::socket socket);
};
//------------------------------------------------------------------------------
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
beast::flat_buffer m_ReadBuffer;
beast::flat_buffer m_WriteBuffer;
std::shared_ptr<ISocketEvents> m_pEvent;
bool bHandShake = false;
http::request<http::string_body> m_upgrade_request;
beast::websocket::close_reason m_closeReason{};
public:
// Take ownership of the socket
WebSocketSession(tcp::socket socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent);
~WebSocketSession();
void run();
void closeSocket();
void on_run();
void on_handshake(beast::error_code ec);
void on_accept(beast::error_code ec);
void on_upgrade(beast::error_code ec, size_t);
void do_read();
void on_read(beast::error_code ec, size_t bytes_transferred);
SOCKET getId();
void SendMessage(std::string strMessage);
};
//------------------------------------------------------------------------------
// Factory function that creates instances of the Server Protocol object.
WEBSOCKETSERVER_API std::shared_ptr<WebSocketListener> STDCALL InitializeWebSocketListener( //
unsigned short port, int maxThreads, std::string certificate, std::string privateKey,
std::shared_ptr<ISocketEvents> handlers);
//--------
//
// It initialize the WebSocketListener
std::shared_ptr<WebSocketListener>
STDCALL InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate,
std::string privateKey, std::shared_ptr<ISocketEvents> handlers) {
// Create and launch a listening port
auto obj = std::make_shared<WebSocketListener>(tcp::endpoint{{}, port}, maxThreads);
obj->setEventHandler(handlers);
obj->Run();
load_server_certificate(obj->m_sslctx, certificate, privateKey);
return obj;
}
//------------------------------------------------------------------------------
// Listener class
WebSocketListener::~WebSocketListener() {}
WebSocketListener::WebSocketListener(tcp::endpoint endpoint, size_t num_threads) : m_ctx(num_threads) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener Constructor Enter");
beast::error_code ec;
// Open the acceptor
m_acceptor.open(endpoint.protocol(), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener open failed, error message: %s",
ec.message().c_str());
return;
}
// Allow address reuse
m_acceptor.set_option(net::socket_base::reuse_address(true), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener set_option failed, error message: %s",
ec.message().c_str());
return;
}
m_acceptor.set_option(tcp::no_delay(true), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener set_option(No delay) failed, error message: %s",
ec.message().c_str());
return;
}
// Bind to the server address
m_acceptor.bind(endpoint, ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener bind failed, error message: %s",
ec.message().c_str());
return;
}
// Start listening for connections
m_acceptor.listen(net::socket_base::max_listen_connections, ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener listen failed, error message: %s",
ec.message().c_str());
return;
} else {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketListener::WebSocketListener Started listening at %s:%hu",
m_acceptor.local_endpoint().address().to_string().c_str(),
m_acceptor.local_endpoint().port());
}
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener Constructor Leave");
}
bool WebSocketListener::SendMessageToClient(SOCKET uClientid, std::string strMessage) {
try {
// TODO maybe make `m_sessions` a map again
for (Handle const& handle : m_sessions) {
if (auto session = handle.lock()) {
if (session->getId() != uClientid)
continue;
session->SendMessage(strMessage);
return true;
}
}
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::SendMessageToClient ClientID: %d not found", uClientid);
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::SendMessageToClient Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::SendMessageToClient Inside Generic Catch");
}
return false;
}
void WebSocketListener::setEventHandler(std::shared_ptr<ISocketEvents> pEvent) {
net::post(m_acceptor.get_executor(), // post on strand
[this, pEvent, self = shared_from_this()] { m_pEvent = pEvent; });
}
// Start accepting incoming connections
void WebSocketListener::Run() {
net::post(m_acceptor.get_executor(), // post on strand
[this, self = shared_from_this()] { //
do_accept();
});
}
void WebSocketListener::Stop() {
net::post(m_acceptor.get_executor(), [this, self = shared_from_this()] {
m_acceptor.cancel();
for (Handle& handle : m_sessions)
if (auto session = handle.lock())
session->closeSocket();
});
m_ctx.join(); // waits for all IO to finish
}
void WebSocketListener::do_accept() {
m_acceptor.async_accept( //
net::make_strand(m_ctx),
beast::bind_front_handler(&WebSocketListener::on_accept, shared_from_this()));
}
void WebSocketListener::on_accept(beast::error_code ec, tcp::socket socket) {
try {
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept accept failed, error SocketID: %d, message: %s",
socket.native_handle(), ec.message().c_str());
} else {
// Accept another connection
do_accept();
socket.set_option(tcp::no_delay(true), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept set_option(No delay) failed, error message: %s",
ec.message().c_str());
return;
}
std::string sClientIp = socket.remote_endpoint().address().to_string();
unsigned short uiClientPort = socket.remote_endpoint().port();
// Create the WebSocketSession and run it
auto session = std::make_shared<WebSocketSession>(std::move(socket), m_sslctx, m_pEvent);
m_sessions.emplace_back(session);
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept insert client id : %d", session->getId());
// optionally: garbage collect expired sessions
m_sessions.remove_if(std::mem_fn(&Handle::expired));
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketListener::on_accept Incoming connection request from SocketID: %d, "
"IP: %s Port: %hu",
session->getId(), sClientIp.c_str(), uiClientPort);
session->run();
}
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept Inside Generic Catch");
}
}
//------------------------------------------------------------------------------
// Session class
WebSocketSession::WebSocketSession(tcp::socket socket, ssl::context& ctx,
std::shared_ptr<ISocketEvents> pEvent)
: ws_(std::move(socket), ctx)
, m_pEvent(pEvent) //
{
m_WriteBuffer.reserve(1000);
m_ReadBuffer.reserve(1000);
}
WebSocketSession::~WebSocketSession() try {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::~WebSocketSession SocketID: %d Closing ", getId());
if (ws_.is_open()) {
beast::error_code ec;
ws_.close(m_closeReason, ec);
if (ec)
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::~WebSocketSession SocketID: %d clsoing error msg: %s",
getId(), ec.message().c_str());
if (m_pEvent)
m_pEvent->OnConnect(getId(), false, "", 0);
}
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"~WebSocketSession Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"~WebSocketSession Inside Generic Catch");
}
// Get on the correct executor
void WebSocketSession::run() {
// We need to be executing within a strand to perform async operations
// on the I/O objects in this WebSocketSession. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(ws_.get_executor(),
beast::bind_front_handler(&WebSocketSession::on_run, shared_from_this()));
}
void WebSocketSession::closeSocket() {
auto id = getId();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::closeSocket Closing SocketID: %d", id);
auto self = shared_from_this();
//// ideally we cleanly close, however this will not reliably close the pending read
// net::post(ws_.get_executor(), [this, self] { get_lowest_layer(ws_).cancel(); });
net::post(ws_.get_executor(), [this, self, id] {
// m_closeReason must stay valid!
ws_.async_close(m_closeReason, [self, id](beast::error_code ec) {
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::closeSocket SocketID: %d Message: %s", id,
ec.message().c_str());
}
});
});
}
// Start the asynchronous operation
void WebSocketSession::on_run() {
// Set the timeout.
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
ws_.next_layer().async_handshake(
ssl::stream_base::server,
beast::bind_front_handler(&WebSocketSession::on_handshake, shared_from_this()));
}
void WebSocketSession::on_handshake(beast::error_code ec) {
try {
if (ec) {
stl::log::trace(
stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_handshake handshake failed, error SocketID: %d, message: %s", getId(),
ec.message().c_str());
return;
}
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async-ssl");
}));
http::async_read(ws_.next_layer(), m_ReadBuffer, m_upgrade_request,
beast::bind_front_handler(&WebSocketSession::on_upgrade, shared_from_this()));
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_handshake Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_handshake Inside Generic Catch");
}
}
void WebSocketSession::on_upgrade(beast::error_code ec, size_t) {
try {
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_upgrade upgrade failed, error SocketID: %d, message: %s",
getId(), ec.message().c_str());
return;
}
stl::log::trace(
stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketSession::on_upgrade Handshake completed for SocketID: %d, WebSocket Headers: %s",
getId(), boost::lexical_cast<std::string>(m_upgrade_request.base()).c_str());
if (m_pEvent)
m_pEvent->OnConnect(getId(), true, "", 0);
// Accept the websocket handshake
ws_.async_accept(m_upgrade_request,
beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_upgrade Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_upgrade Inside Generic Catch");
}
}
void WebSocketSession::on_accept(beast::error_code ec) {
if (!ec)
do_read(); // Read a message
}
void WebSocketSession::do_read() {
try {
if (m_pEvent && !bHandShake) {
bHandShake = true;
m_pEvent->onHandShake(getId(), bHandShake);
}
// Read a message into our buffer
ws_.async_read(m_ReadBuffer,
beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_read Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_read Inside Generic Catch");
}
}
void WebSocketSession::on_read(beast::error_code ec, size_t bytes_transferred) {
try {
boost::ignore_unused(bytes_transferred);
// This indicates that the WebSocketSession was closed
if (ec) {
if (ec == websocket::error::closed) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketSession::on_read Client closed SocketID: %d", getId());
}
return;
}
if (auto iLen = m_ReadBuffer.cdata().size()) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketSession::on_read Client SocketID: %d dataRecv: %zu cpacity: %zu",
getId(), iLen, m_ReadBuffer.capacity());
std::string data(net::buffer_cast<char const*>(m_ReadBuffer.cdata()), iLen);
if (m_pEvent)
m_pEvent->OnReceive(getId(), data.data(), iLen);
}
m_ReadBuffer.consume(bytes_transferred);
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_read Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_read Inside Generic Catch");
}
do_read();
}
SOCKET WebSocketSession::getId() { return beast::get_lowest_layer(ws_).socket().native_handle(); }
void WebSocketSession::SendMessage(std::string strMessage) {
net::post(ws_.get_executor(), [this, msg = std::move(strMessage), self=shared_from_this()] {
try {
if (msg.length() > 0) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, "
"BufferCapacity: %zu msgLen:%zud, msg:%s",
getId(), m_WriteBuffer.size(), m_WriteBuffer.capacity(), msg.length(),
msg.c_str());
if (m_WriteBuffer.size() > 40960) // Pending data size is greater than 40 MB
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, "
"Disconnecting client due to pending buffer limit crossed",
getId(), m_WriteBuffer.size());
} else {
boost::beast::ostream(m_WriteBuffer) << msg << ":";
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, "
"BufferCapacity: %zu",
getId(), m_WriteBuffer.size(), m_WriteBuffer.capacity());
ws_.async_write(m_WriteBuffer.data(), [this](beast::error_code ec, size_t transferred) {
boost::ignore_unused(transferred);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage callback error SocketID: %d, "
"ErrorMsg:%s" /* ,Message: %s"*/,
getId(), ec.message().c_str() /*,msg.c_str()*/);
}
m_WriteBuffer.consume(transferred);
});
}
}
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage Inside Generic Catch");
}
});
}
int main() {
auto handlers = std::make_shared<ISocketEvents>();
auto wsl = InitializeWebSocketListener(8989, 50, "CERT", "KEY", handlers);
std::this_thread::sleep_for(std::chrono::seconds(10));
wsl->Stop();
}
本地演示:总结、注意事项
m_closeReason
了解更微妙的生命周期)和关闭。剩余问题:
closeSocket()
部分取决于对等方根据Websocket规范积极配合。您可能想要扩展逻辑以最终强制关闭套接字
SendMessage
的安全 - 目前完全有可能发出重叠写入。这……不符合规格。您可以通过在其中放置一个传出消息队列来解决这个问题