我正在使用 boost 库开发 UDP 服务器。服务器必须跟踪“已连接”的用户。如果服务器在一定时间限制(本例中为 5 秒)内未收到消息,则这些将被视为“未连接”。当客户端超时时,必须将其从客户端列表中删除。
服务器工作正常并且我收到消息,但是当客户端断开连接时应用程序崩溃。我发现了并发平面映射的擦除函数中的问题,但我无法理解为什么会发生......下面是重现错误的最小代码:
#include <stdio.h>
#include <iostream>
#include <string>
#include <chrono>
#include <condition_variable>
#include <atomic>
#include <unistd.h>
#include <thread>
#include <functional>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/unordered/concurrent_flat_map.hpp>
#include <boost/lockfree/spsc_queue.hpp>
class UdpClient
{
public:
// Constructor
UdpClient(int recv_id, std::function<void(int)> cb) :
m_queue(100)
{
this->m_recv_id = recv_id;
this->m_on_disconnected = cb;
this->m_proc_thread = std::thread(std::bind(&UdpClient::handle_queue, this));
}
// Enqueue
void enqueue(unsigned char* buffer)
{
if (this->m_queue.write_available() > 0)
{
this->m_queue.push(buffer);
this->m_proc_cond.notify_one();
}
}
private:
int m_recv_id;
std::function<void(int)> m_on_disconnected;
boost::lockfree::spsc_queue<unsigned char*> m_queue;
std::condition_variable m_proc_cond;
std::mutex m_proc_mutex;
std::thread m_proc_thread;
// Handle queue
void handle_queue()
{
while (true)
{
while (this->m_queue.read_available() > 0)
{
this->m_queue.consume_one(
[&] (unsigned char* buffer)
{
// Process
}
);
}
std::unique_lock<std::mutex> lk(this->m_proc_mutex);
auto now = std::chrono::system_clock::now();
auto timeout = now + std::chrono::seconds(5);
if (!this->m_proc_cond.wait_until(lk, timeout, [&] ()
{
return this->m_queue.read_available() > 0;
}))
{
std::cout << "timeout occurs\n";
break;
}
}
// Cleanup messages queue
this->m_queue.consume_all([] (unsigned char* buffer) { delete buffer; });
// Notify disconnection
this->m_on_disconnected(this->m_recv_id);
}
};
class UdpServer
{
public:
// Constructor
UdpServer(int port) :
m_ioservice(),
m_clients (5)
{
this->m_port = port;
this->m_socket = std::make_shared<boost::asio::ip::udp::socket>(m_ioservice);
}
// Listen
void listen()
{
auto endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), this->m_port);
this->m_socket->open(endpoint.protocol());
this->m_socket->set_option(boost::asio::ip::udp::socket::reuse_address(true));
this->m_socket->set_option(boost::asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT>(true));
this->m_socket->set_option(boost::asio::ip::udp::socket::keep_alive(true));
this->m_socket->bind(endpoint);
this->handle_receive();
this->m_ioservice.run();
}
private:
int m_port;
boost::asio::io_service m_ioservice;
std::shared_ptr<boost::asio::ip::udp::socket> m_socket;
std::thread m_listen_thread;
boost::concurrent_flat_map<int32_t, std::shared_ptr<UdpClient>> m_clients;
std::array<unsigned char, 1500> m_buffer {};
boost::asio::ip::udp::endpoint m_remote_endpoint;
// Handle receive
void handle_receive()
{
this->m_socket->async_receive_from(
boost::asio::buffer(this->m_buffer, 1500),
this->m_remote_endpoint,
[this] (boost::system::error_code const & err, size_t num_bytes)
{
if (num_bytes > 0)
{
int recv_id = 1;
unsigned char buffer[num_bytes];
std::copy_n(std::begin(this->m_buffer), num_bytes, buffer);
if (this->m_clients.contains(recv_id))
{
this->m_clients.visit(recv_id,
[&] (std::pair<const int, std::shared_ptr<UdpClient>> &x)
{
x.second->enqueue(buffer);
}
);
}
else
{
auto client = std::make_shared<UdpClient>(recv_id,
[&] (int id)
{
std::cout << "received disconnect signal\n";
if (this->m_clients.contains(id))
{
// ---------- ERROR HERE ----------
this->m_clients.erase(id);
// --------------------------------
}
}
);
this->m_clients.insert(std::pair<int32_t, std::shared_ptr<UdpClient>>(recv_id, client));
client->enqueue(buffer);
std::cout << "connected clients = " << this->m_clients.size() << "\n";
}
}
// Continue receiving...
this->handle_receive();
}
);
}
};
int main(int argc, char * argv[])
{
auto server = new UdpServer(9000);
server->listen();
return 0;
}
编译:
g++ -o test main.cpp -lboost_system -lboost_date_time -lboost_thread -ltb
测试它(在 Linux 上):
nc -u 127.0.0.1 9000 < /dev/random
运行1秒并终止进程
注意:我正在使用最新版本的boost(1.85.0)
我尝试使用 try ... catch 但不起作用...
解决了!好像有人锁了
shared_ptr
。切换自:
boost::concurrent_flat_map<int32_t, std::shared_ptr<UdpClient>>
至:
boost::concurrent_flat_map<int32_t, UdpClient*>
解决了问题