从 boost::concurrent_flat_map 中擦除会导致崩溃

问题描述 投票:0回答:1

我正在使用 boost 库开发 UDP 服务器。服务器必须跟踪“已连接”的用户。如果服务器在一定时间限制(本例中为 5 秒)内未收到消息,则这些将被视为“未连接”。当客户端超时时,必须将其从客户端列表中删除。

服务器工作正常并且我收到消息,但是当客户端断开连接时应用程序崩溃。我发现了并发平面映射的擦除函数中的问题,但我无法理解为什么会发生......下面是重现错误的最小代码:

#include <stdio.h>
#include <iostream>
#include <string>
#include <chrono>
#include <condition_variable>
#include <atomic>
#include <unistd.h>  
#include <thread>
#include <functional>

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/unordered/concurrent_flat_map.hpp>
#include <boost/lockfree/spsc_queue.hpp>


class UdpClient
{
    public:
        // Constructor
        UdpClient(int recv_id, std::function<void(int)> cb) :
            m_queue(100)
        {
            this->m_recv_id = recv_id;
            this->m_on_disconnected = cb;
            this->m_proc_thread = std::thread(std::bind(&UdpClient::handle_queue, this));
        }

        // Enqueue
        void enqueue(unsigned char* buffer) 
        {   
            if (this->m_queue.write_available() > 0)
            {
                this->m_queue.push(buffer);
                this->m_proc_cond.notify_one();
            }
        }

    private:

        int m_recv_id;    
        std::function<void(int)> m_on_disconnected;
        boost::lockfree::spsc_queue<unsigned char*> m_queue;
        std::condition_variable m_proc_cond;
        std::mutex              m_proc_mutex;
        std::thread             m_proc_thread;

        // Handle queue
        void handle_queue()
        {
            while (true) 
            {       
                while (this->m_queue.read_available() > 0)            
                {
                    this->m_queue.consume_one(
                        [&] (unsigned char* buffer)
                        {
                            // Process                  
                        }
                    );
                } 
              
                std::unique_lock<std::mutex> lk(this->m_proc_mutex);
                auto now     = std::chrono::system_clock::now();
                auto timeout = now + std::chrono::seconds(5);
                if (!this->m_proc_cond.wait_until(lk, timeout, [&] ()
                {
                    return this->m_queue.read_available() > 0;
                })) 
                {   
                    std::cout << "timeout occurs\n";
                    break;
                }
            } 
            
            // Cleanup messages queue
            this->m_queue.consume_all([] (unsigned char* buffer) { delete buffer; });

            // Notify disconnection
            this->m_on_disconnected(this->m_recv_id);
        }

};

class UdpServer
{
    public:
        // Constructor
        UdpServer(int port) : 
            m_ioservice(),    
            m_clients  (5)
        {
            this->m_port = port;
            this->m_socket = std::make_shared<boost::asio::ip::udp::socket>(m_ioservice);
        }

        // Listen
        void listen()
        {
            auto endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), this->m_port);
            this->m_socket->open(endpoint.protocol());
            this->m_socket->set_option(boost::asio::ip::udp::socket::reuse_address(true));
            this->m_socket->set_option(boost::asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT>(true));
            this->m_socket->set_option(boost::asio::ip::udp::socket::keep_alive(true));
            this->m_socket->bind(endpoint);
            this->handle_receive();
            this->m_ioservice.run();
        }
    
    private:

        int m_port;
        boost::asio::io_service m_ioservice;    
        std::shared_ptr<boost::asio::ip::udp::socket> m_socket;   
        std::thread m_listen_thread;      
        boost::concurrent_flat_map<int32_t, std::shared_ptr<UdpClient>> m_clients; 
        std::array<unsigned char, 1500> m_buffer {};
        boost::asio::ip::udp::endpoint m_remote_endpoint;

        // Handle receive
        void handle_receive() 
        {  
            this->m_socket->async_receive_from(
                boost::asio::buffer(this->m_buffer, 1500), 
                this->m_remote_endpoint,
                [this] (boost::system::error_code const & err, size_t num_bytes)
                {    
                    if (num_bytes > 0) 
                    {                
                        int recv_id = 1;
                        unsigned char buffer[num_bytes];
                        std::copy_n(std::begin(this->m_buffer), num_bytes, buffer);
                                                
                        if (this->m_clients.contains(recv_id))
                        {
                            this->m_clients.visit(recv_id,
                                [&] (std::pair<const int, std::shared_ptr<UdpClient>> &x)
                                {
                                    x.second->enqueue(buffer);
                                }
                            );                            
                        }
                        else
                        {
                            auto client = std::make_shared<UdpClient>(recv_id,
                                [&] (int id)
                                {
                                    std::cout << "received disconnect signal\n";
                                    if (this->m_clients.contains(id))
                                    {        
                                        // ---------- ERROR HERE ----------
                                        this->m_clients.erase(id);
                                        // --------------------------------
                                    }      
                                }
                            );
                            this->m_clients.insert(std::pair<int32_t, std::shared_ptr<UdpClient>>(recv_id, client));
                            client->enqueue(buffer);

                            std::cout << "connected clients = " << this->m_clients.size() << "\n";
                        }
                    }

                    // Continue receiving...
                    this->handle_receive();
                }
            );    
        }

};



int main(int argc, char * argv[])
{    
    auto server = new UdpServer(9000);
    server->listen();

    return 0;
}

编译:

g++ -o test main.cpp -lboost_system -lboost_date_time -lboost_thread -ltb

测试它(在 Linux 上):

nc -u 127.0.0.1 9000 < /dev/random

运行1秒并终止进程

注意:我正在使用最新版本的boost(1.85.0)

我尝试使用 try ... catch 但不起作用...

c++ boost
1个回答
0
投票

解决了!好像有人锁了

shared_ptr
。切换自:

boost::concurrent_flat_map<int32_t, std::shared_ptr<UdpClient>>

至:

boost::concurrent_flat_map<int32_t, UdpClient*>

解决了问题

© www.soinside.com 2019 - 2024. All rights reserved.