增强asio。客户端udp效率

问题描述 投票:0回答:1

我已经使用多线程环境实现了 udp 会话。

    using RawDataArray=std::array <unsigned char,65000>;
    
    class StaticBuffer
    {
    private:
        RawDataArray                            m_data;
        std::size_t                             m_n_avail;
    public:
    
        StaticBuffer():m_data(),m_n_avail(0){}
        StaticBuffer(std::size_t n_bytes){m_n_avail=n_bytes;}
        StaticBuffer(const StaticBuffer& other)
        {
            std::cout<<"ctor cpy\n";
            m_data=other.m_data;
            m_n_avail=other.m_n_avail;
        }
        StaticBuffer(const StaticBuffer& other,std::size_t n_bytes)
        {
            std::cout<<"ctor cpy\n";
            m_data=other.m_data;
            m_n_avail=n_bytes;
        }
        StaticBuffer(const RawDataArray& data,std::size_t n_bytes)
        {
            std::cout<<"ctor static buff\n";
            m_data=data;
            m_n_avail=n_bytes;
        }
        void set_size(int n)
        {
            m_n_avail=n;
        }
        void set_max_size(){m_n_avail=m_data.size();}
        std::size_t max_size()const {return m_data.size();}
        unsigned char& operator[](std::size_t i){return m_data[i];}
        const unsigned char& operator[] (std::size_t i)const{return m_data[i];}
        StaticBuffer& operator=(const StaticBuffer& other)
        {
            if (this == &other)
                return *this;
            m_data = other.m_data;
            m_n_avail = other.m_n_avail;
            return *this;
        }
        void push_back(unsigned char val)
        {
            if (m_n_avail<m_data.size())
            {
                m_data[m_n_avail]=val;
            }else
                throw "Out of memory";
        }
        void reset(){m_n_avail=0;}
    
        unsigned char* data(){return m_data.data();}
        const unsigned char* data()const {return m_data.data();}
        std::size_t size()const{return m_n_avail;}
    
        ~StaticBuffer(){}
    };
    
        class UDPSeassion;
        using DataBuffer = StaticBuffer;
        using DataBufferPtr=std::unique_ptr<DataBuffer>;
        using ExternalReadHandler=std::function<void(DataBufferPtr)>;
        
        class UDPSeassion:public std::enable_shared_from_this<UDPSeassion>
        {
        private:
            asio::io_context&           m_ctx;
            asio::ip::udp::socket       m_socket;
            asio::ip::udp::endpoint     m_endpoint;
            std::string                 m_addr;
            unsigned short              m_port;
        
            asio::io_context::strand    m_send_strand;
            std::deque<DataBufferPtr>   m_dq_send;
        
        
            asio::io_context::strand    m_rcv_strand;
            DataBufferPtr               m_rcv_data;
        
            ExternalReadHandler         external_rcv_handler;
        
        private:
            void do_send_data_from_dq()
            {
                if (m_dq_send.empty())
                    return;
        
                m_socket.async_send_to(
                            asio::buffer(m_dq_send.front()->data(),m_dq_send.front()->size()),
                            m_endpoint,
                            asio::bind_executor(m_send_strand,[this](const boost::system::error_code& er, std::size_t bytes_transferred){
                    if (!er)
                    {
                        m_dq_send.pop_front();
                        do_send_data_from_dq();
        
                    }else
                    {
                        //post to loggger
                    }
                }));
            }
        
            void do_read(const boost::system::error_code& er, std::size_t bytes_transferred)
            {
                if (!er)
                {
                    m_rcv_data->set_size(bytes_transferred);
                    asio::post(m_ctx,[this,data=std::move(m_rcv_data)]()mutable{ external_rcv_handler(std::move(data));});
                    m_rcv_data=std::make_unique<DataBuffer>();
                    m_rcv_data->set_max_size();
                    async_read();
                }
            }
        
        public:
        
            UDPSeassion(asio::io_context& ctx,const std::string& addr, unsigned short port):
                m_ctx(ctx),
                m_socket(ctx),
                m_endpoint(asio::ip::address::from_string(addr),port),
                m_addr(addr),
                m_port(port),
                m_send_strand(ctx),
                m_dq_send(),
                m_rcv_strand(ctx),
                m_rcv_data(std::make_unique<DataBuffer>(65000))
        
            {}
            ~UDPSeassion(){}
        
            const std::string& get_host()const{return m_addr;}
            unsigned short get_port(){return m_port;}
            template<typename ExternalReadHandlerCallable>
            void set_read_data_headnler(ExternalReadHandlerCallable&& handler)
            {
                external_rcv_handler=std::forward<ExternalReadHandlerCallable>(handler);
            }
            void start()
            {
                m_socket.open(asio::ip::udp::v4());
                async_read();
            }
        
            void async_read()
            {
                m_socket.async_receive_from(
                            asio::buffer(m_rcv_data->data(),m_rcv_data->size()),
                            m_endpoint,
                            asio::bind_executor(m_rcv_strand,std::bind(&UDPSeassion::do_read,this,std::placeholders::_1,std::placeholders::_2) )
                            );
            }
        
            void async_send(DataBufferPtr pData)
            {
                asio::post(m_ctx,
                           asio::bind_executor(m_send_strand,[this,pDt=std::move(pData)]()mutable{
                                                                                                m_dq_send.emplace_back(std::move(pDt));
                                                                                                if (m_dq_send.size()==1)
                                                                                                    do_send_data_from_dq();
                                                                                                }));
            }
        };

void handler_read(DataBufferPtr pdata)
{
    // decoding raw_data -> decod_data
    // lock mutext
    // queue.push_back(decod_data)
    // unlock mutext

    //for view pdata
    std::stringstream ss;
    ss<<"thread handler: "<<std::this_thread::get_id()<<" "<<pdata->data()<<" "<<pdata->size()<<std::endl;
    std::cout<<ss.str()<<std::endl;
}
int main()
{
    asio::io_context ctx;
    //auto work_guard = asio::make_work_guard(ctx);
    std::cout<<"MAIN thread: "<<std::this_thread::get_id()<<std::endl;
    StaticBuffer b{4};
    b[0]='A';
    b[1]='B';
    b[2]='C';
    b[4]='\n';

    UDPSeassion client(ctx,"127.0.0.1",11223);
    client.set_read_data_headnler(handler_read);
    client.start();

    std::vector<std::thread> threads;

    for (int i=0;i<3;++i)
    {
        threads.emplace_back([&](){
            std::stringstream ss;
            ss<<"run thread: "<<std::this_thread::get_id()<<std::endl;
            std::cout<<ss.str();
            ctx.run();
            std::cout<<"end thread\n";
        }
        );
    }

    client.async_send(std::make_unique<StaticBuffer>(b));
    ctx.run();

    for (auto& t:threads)
        t.join();

    return 1;
}

在上面的代码中,主要强调的是UDPSeasion类。编写 StaticBuffer 类是为了执行主要功能。我有一些问题:

  1. 假设此类将构建到工作频率约为 100 Hz 的系统中。每 10ms,系统将通过客户端发送其状态。 1.1 对于多线程环境是否正确完成?这种实施的效率如何? 1.2 如果客户端实现内部仅包含一个用于读写的线程,其效率如何? 示例
  2. 任务之间的缓冲区传输是否正确? (std::move(unique_ptr_data))
  3. 实际中,给客户端多少个线程来处理读写?
  4. 对于 TCP 客户端?

我将非常感谢您对我的问题的详细回答。非常感谢))

c++ boost boost-asio
1个回答
0
投票

我会简化很多。

  • 您“使用”

    enable_shared_from_this
    ,但没有任何异步操作捕获
    shared_from_this
    。事实上,您甚至没有分配
    UDPSession
    共享,因此使用
    shared_from_this
    根本就是未定义行为。

  • 隐含了无操作析构函数。如果您必须声明它们,

    = default
    它们

  • m_rcv_strand
    已弃用 - 使用
    strand<>
    代替

  • 为什么有单独的发送/接收链?当然,允许 1 个读取操作与 1 个写入操作重叠,但是如果没有适当的同步,您仍然无法访问共享对象(如

    m_socket

  • 股,但似乎错误地没有在相关的地方发布到它们(例如

    post(m_ctx, bind_executor(m_send_strand, ....))
    是冲突的)

  • 您有一个费力的缓冲区类型,/似乎/旨在避免分配,但无论如何您都将其包装在 unique_ptr 中 ́\(ツ)

  • set_read_data_handler
    不需要是模板。既然你无论如何都要擦除到
    std::function
    ,那么与仅使用相比,好处为零:

     void set_read_data_handler(ExternalReadHandler handler) {
          external_rcv_handler = std::move(handler);
     }
    
  • 您有重复的魔法常数(例如

    65000

  • 您似乎缺少一个套接字

    bind()
    调用

简而言之,我会用一些合理的东西替换缓冲区:

using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);

由于您似乎期望使用文本协议,因此您的平均消息可能会(小得多)小,所以我认为仅使用

std::string
甚至
boost::container::small_vector<...>
可能会更快。

并不是真正必需的,但为了优雅的、asio 标准的使用:

using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }

查看简化版本 Live On Coliru

#include <boost/asio.hpp>
#include <boost/container/static_vector.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <list>
#include <thread>

namespace { // user-friendly logging
    static std::mutex      s_console_mx;
    static std::atomic_int t_id_gen = 0;
    thread_local int       t_id     = ++t_id_gen;

    template <typename... T> static inline void trace(T const&... args) {
        std::lock_guard lk(s_console_mx);
        ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
    }
} // namespace

namespace asio = boost::asio;
using asio::ip::udp;

using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);

// not really required but to allow for elegant, asio-standard use:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }

using ExternalReadHandler = std::function<void(StaticBuffer&&)>;

class UDPSession {
  private:
    using error_code = boost::system::error_code;
    asio::any_io_executor m_ex;
    std::string           m_addr;
    uint16_t              m_port;

    udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
    udp::socket   m_socket{make_strand(m_ex)};

    std::deque<StaticBuffer> m_dq_send;
    StaticBuffer             m_rcv_data;
    ExternalReadHandler      external_rcv_handler;

  public:
    UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
        : m_ex(ex)
        , m_addr(addr)
        , m_port(port) {}

    std::string const& get_host() const { return m_addr; }
    uint16_t           get_port() { return m_port; }
    void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }

    void start() {
        m_socket.open(udp::v4());
        m_socket.bind(m_endpoint);
        do_read();
    }

    void send(StaticBuffer data) {
        asio::post(m_socket.get_executor(), [this, d = std::move(data)]() mutable {
            m_dq_send.emplace_back(std::move(d));
            if (m_dq_send.size() == 1)
                send_loop();
        });
    }

  private:
    void do_read() {
        m_rcv_data.assign(m_rcv_data.static_capacity, '\0');
        m_socket.async_receive_from(
            buffer(m_rcv_data), m_endpoint,
            std::bind(&UDPSession::on_read, this, std::placeholders::_1, std::placeholders::_2));
    }

    void on_read(error_code er, size_t bytes_transferred) {
        if (!er) {
            m_rcv_data.resize(bytes_transferred);
            asio::post(m_ex, [this, data = std::move(m_rcv_data)]() mutable {
                external_rcv_handler(std::move(data));
            });
            do_read();
        }
    }

    void send_loop() {
        if (m_dq_send.empty())
            return;

        m_socket.async_send_to(buffer(m_dq_send.front()), m_endpoint,
                               [this](error_code er, size_t /*bytes_transferred*/) {
                                   if (!er) {
                                       m_dq_send.pop_front();
                                       send_loop();
                                   } // else { /* post to loggger */ }
                               });
    }
};

void handler_read(StaticBuffer&& pdata) {
    if (!pdata.empty()) {
        std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
        trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
    }
}

int main() {
    asio::io_context ctx;
    auto work_guard = asio::make_work_guard(ctx);

    trace("Main thread");

    std::list<std::thread> threads;

    for (int i = 0; i < 3; ++i)
        threads.emplace_back([&]() {
            trace("START");
            ctx.run();
            trace("END");
        });

    UDPSession client(ctx.get_executor(), "127.0.0.1", 11223);
    client.set_read_data_handler(handler_read);
    client.start();
    client.send({'A', 'B', 'C', '\n'});

    work_guard.reset();

    for (auto& t : threads)
        t.join();
}

Coliru 上的现场演示“吃掉”了

main.cpp
中的单词。这是本地词典演示:

额外:线程池,
shared_from_this

您可能已经注意到我改为

any_io_executor
而不是
io_context&
。这样您就可以轻松切换到
asio::thread_pool
,而不是手动执行(效果很差)。

我们也重新设置

shared_from_this
,但这次是正确的。

为了简单起见,我仅使用静态缓冲区作为接收缓冲区(因为这就是数据报协议的滚动方式),并且仅使用

vector
(或
small_vector
)作为
DataBuffer

住在Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>

namespace { // user-friendly logging
    static std::mutex      s_console_mx;
    static std::atomic_int t_id_gen = 0;
    thread_local int       t_id     = ++t_id_gen;

    template <typename... T> static inline void trace(T const&... args) {
        std::lock_guard lk(s_console_mx);
        ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
    }
} // namespace

namespace asio = boost::asio;
using asio::ip::udp;

//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;

class UDPSession : public std::enable_shared_from_this<UDPSession> {
  private:
    using error_code = boost::system::error_code;
    asio::any_io_executor m_ex;
    std::string           m_addr;
    uint16_t              m_port;

    udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
    udp::socket   m_socket{make_strand(m_ex)};

    std::deque<DataBuffer>     m_dq_send;
    std::array<uint8_t, 65000> m_rcv_data;
    ExternalReadHandler        external_rcv_handler;

  public:
    UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
        : m_ex(ex)
        , m_addr(addr)
        , m_port(port) {}

    std::string const& get_host() const { return m_addr; }
    uint16_t           get_port() { return m_port; }
    void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }

    void start() {
        m_socket.open(udp::v4());
        m_socket.bind(m_endpoint);
        do_read();
    }

    void send(DataBuffer data) {
        asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
            m_dq_send.emplace_back(std::move(d));
            if (m_dq_send.size() == 1)
                send_loop();
        });
    }

  private:
    void do_read() {
        using namespace std::placeholders;
        m_socket.async_receive_from( //
            asio::buffer(m_rcv_data), m_endpoint,
            std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
    }

    void on_read(error_code er, size_t bytes_transferred) {
        if (!er) {
            asio::post(
                m_ex,
                [this, self=shared_from_this(),
                 data = DataBuffer(m_rcv_data.data(), m_rcv_data.data() + bytes_transferred)]() mutable {
                    external_rcv_handler(std::move(data));
                });
            do_read();
        }
    }

    void send_loop() {
        if (m_dq_send.empty())
            return;

        m_socket.async_send_to( //
            asio::buffer(m_dq_send.front()), m_endpoint,
            [this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
                if (!er) {
                    m_dq_send.pop_front();
                    send_loop();
                } // else { /* post to loggger */ }
            });
    }
};

void handler_read(DataBuffer&& pdata) {
    if (!pdata.empty()) {
        std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
        trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
    }
}

int main() {
    trace("Main thread");

    asio::thread_pool ctx(4);

    {
        auto client = std::make_shared<UDPSession>(ctx.get_executor(), "127.0.0.1", 11223);
        client->set_read_data_handler(handler_read);
        client->start();
        client->send({'A', 'B', 'C', '\n'});
    } // client stays alive through shared ownership

    ctx.join();
}

作为锦上添花,您可以在具体的执行器类型上对整个事物进行模板化,并避免类型擦除执行器类型:

template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
    using socket_t   = asio::basic_datagram_socket<udp, asio::strand<Executor>>;

看到它在Coliru上直播

#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>

namespace { // user-friendly logging
    static std::mutex      s_console_mx;
    static std::atomic_int t_id_gen = 0;
    thread_local int       t_id     = ++t_id_gen;

    template <typename... T> static inline void trace(T const&... args) {
        std::lock_guard lk(s_console_mx);
        ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
    }
} // namespace

namespace asio = boost::asio;
using asio::ip::udp;

//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;

template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
    using socket_t   = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
    using error_code = boost::system::error_code;
    Executor    m_ex;
    std::string m_addr;
    uint16_t    m_port;

    udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
    socket_t      m_socket{make_strand(m_ex)};

    std::deque<DataBuffer>     m_dq_send;
    std::array<uint8_t, 65000> m_rcv_data;
    ExternalReadHandler        external_rcv_handler;

    using std::enable_shared_from_this<UDPSession>::shared_from_this;
  public:
    UDPSession(Executor ex, std::string const& addr, uint16_t port)
        : m_ex(ex)
        , m_addr(addr)
        , m_port(port) {}

    std::string const& get_host() const { return m_addr; }
    uint16_t           get_port() { return m_port; }
    void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }

    void start() {
        m_socket.open(udp::v4());
        m_socket.bind(m_endpoint);
        do_read();
    }

    void send(DataBuffer data) {
        asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
            m_dq_send.emplace_back(std::move(d));
            if (m_dq_send.size() == 1)
                send_loop();
        });
    }

  private:
    void do_read() {
        using namespace std::placeholders;
        m_socket.async_receive_from( //
            asio::buffer(m_rcv_data), m_endpoint,
            std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
    }

    void on_read(error_code er, size_t bytes_transferred) {
        if (!er) {
            auto f = m_rcv_data.data(), l = f + bytes_transferred;
            asio::post(m_ex, [self = shared_from_this(), data = DataBuffer(f, l)]() mutable {
                self->external_rcv_handler(std::move(data));
            });
            do_read();
        }
    }

    void send_loop() {
        if (m_dq_send.empty())
            return;

        m_socket.async_send_to( //
            asio::buffer(m_dq_send.front()), m_endpoint,
            [this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
                if (!er) {
                    m_dq_send.pop_front();
                    send_loop();
                } // else { /* post to loggger */ }
            });
    }
};

void handler_read(DataBuffer&& pdata) {
    if (!pdata.empty()) {
        std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
        trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
    }
}

int main() {
    trace("Main thread");

    using Ex = asio::thread_pool::executor_type;
    asio::thread_pool ctx(4);

    {
        auto client = std::make_shared<UDPSession<Ex> >(ctx.get_executor(), "127.0.0.1", 11223);
        client->set_read_data_handler(handler_read);
        client->start();
        client->send({'A', 'B', 'C', '\n'});
    } // client stays alive through shared ownership

    ctx.join();
}

另一个本地演示:


¹ 您至少需要在运行器线程中进行异常处理:是否应该捕获 boost::asio::io_service::run() 抛出的异常?

© www.soinside.com 2019 - 2024. All rights reserved.