我已经使用多线程环境实现了 udp 会话。
using RawDataArray=std::array <unsigned char,65000>;
class StaticBuffer
{
private:
RawDataArray m_data;
std::size_t m_n_avail;
public:
StaticBuffer():m_data(),m_n_avail(0){}
StaticBuffer(std::size_t n_bytes){m_n_avail=n_bytes;}
StaticBuffer(const StaticBuffer& other)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=other.m_n_avail;
}
StaticBuffer(const StaticBuffer& other,std::size_t n_bytes)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=n_bytes;
}
StaticBuffer(const RawDataArray& data,std::size_t n_bytes)
{
std::cout<<"ctor static buff\n";
m_data=data;
m_n_avail=n_bytes;
}
void set_size(int n)
{
m_n_avail=n;
}
void set_max_size(){m_n_avail=m_data.size();}
std::size_t max_size()const {return m_data.size();}
unsigned char& operator[](std::size_t i){return m_data[i];}
const unsigned char& operator[] (std::size_t i)const{return m_data[i];}
StaticBuffer& operator=(const StaticBuffer& other)
{
if (this == &other)
return *this;
m_data = other.m_data;
m_n_avail = other.m_n_avail;
return *this;
}
void push_back(unsigned char val)
{
if (m_n_avail<m_data.size())
{
m_data[m_n_avail]=val;
}else
throw "Out of memory";
}
void reset(){m_n_avail=0;}
unsigned char* data(){return m_data.data();}
const unsigned char* data()const {return m_data.data();}
std::size_t size()const{return m_n_avail;}
~StaticBuffer(){}
};
class UDPSeassion;
using DataBuffer = StaticBuffer;
using DataBufferPtr=std::unique_ptr<DataBuffer>;
using ExternalReadHandler=std::function<void(DataBufferPtr)>;
class UDPSeassion:public std::enable_shared_from_this<UDPSeassion>
{
private:
asio::io_context& m_ctx;
asio::ip::udp::socket m_socket;
asio::ip::udp::endpoint m_endpoint;
std::string m_addr;
unsigned short m_port;
asio::io_context::strand m_send_strand;
std::deque<DataBufferPtr> m_dq_send;
asio::io_context::strand m_rcv_strand;
DataBufferPtr m_rcv_data;
ExternalReadHandler external_rcv_handler;
private:
void do_send_data_from_dq()
{
if (m_dq_send.empty())
return;
m_socket.async_send_to(
asio::buffer(m_dq_send.front()->data(),m_dq_send.front()->size()),
m_endpoint,
asio::bind_executor(m_send_strand,[this](const boost::system::error_code& er, std::size_t bytes_transferred){
if (!er)
{
m_dq_send.pop_front();
do_send_data_from_dq();
}else
{
//post to loggger
}
}));
}
void do_read(const boost::system::error_code& er, std::size_t bytes_transferred)
{
if (!er)
{
m_rcv_data->set_size(bytes_transferred);
asio::post(m_ctx,[this,data=std::move(m_rcv_data)]()mutable{ external_rcv_handler(std::move(data));});
m_rcv_data=std::make_unique<DataBuffer>();
m_rcv_data->set_max_size();
async_read();
}
}
public:
UDPSeassion(asio::io_context& ctx,const std::string& addr, unsigned short port):
m_ctx(ctx),
m_socket(ctx),
m_endpoint(asio::ip::address::from_string(addr),port),
m_addr(addr),
m_port(port),
m_send_strand(ctx),
m_dq_send(),
m_rcv_strand(ctx),
m_rcv_data(std::make_unique<DataBuffer>(65000))
{}
~UDPSeassion(){}
const std::string& get_host()const{return m_addr;}
unsigned short get_port(){return m_port;}
template<typename ExternalReadHandlerCallable>
void set_read_data_headnler(ExternalReadHandlerCallable&& handler)
{
external_rcv_handler=std::forward<ExternalReadHandlerCallable>(handler);
}
void start()
{
m_socket.open(asio::ip::udp::v4());
async_read();
}
void async_read()
{
m_socket.async_receive_from(
asio::buffer(m_rcv_data->data(),m_rcv_data->size()),
m_endpoint,
asio::bind_executor(m_rcv_strand,std::bind(&UDPSeassion::do_read,this,std::placeholders::_1,std::placeholders::_2) )
);
}
void async_send(DataBufferPtr pData)
{
asio::post(m_ctx,
asio::bind_executor(m_send_strand,[this,pDt=std::move(pData)]()mutable{
m_dq_send.emplace_back(std::move(pDt));
if (m_dq_send.size()==1)
do_send_data_from_dq();
}));
}
};
void handler_read(DataBufferPtr pdata)
{
// decoding raw_data -> decod_data
// lock mutext
// queue.push_back(decod_data)
// unlock mutext
//for view pdata
std::stringstream ss;
ss<<"thread handler: "<<std::this_thread::get_id()<<" "<<pdata->data()<<" "<<pdata->size()<<std::endl;
std::cout<<ss.str()<<std::endl;
}
int main()
{
asio::io_context ctx;
//auto work_guard = asio::make_work_guard(ctx);
std::cout<<"MAIN thread: "<<std::this_thread::get_id()<<std::endl;
StaticBuffer b{4};
b[0]='A';
b[1]='B';
b[2]='C';
b[4]='\n';
UDPSeassion client(ctx,"127.0.0.1",11223);
client.set_read_data_headnler(handler_read);
client.start();
std::vector<std::thread> threads;
for (int i=0;i<3;++i)
{
threads.emplace_back([&](){
std::stringstream ss;
ss<<"run thread: "<<std::this_thread::get_id()<<std::endl;
std::cout<<ss.str();
ctx.run();
std::cout<<"end thread\n";
}
);
}
client.async_send(std::make_unique<StaticBuffer>(b));
ctx.run();
for (auto& t:threads)
t.join();
return 1;
}
在上面的代码中,主要强调的是UDPSeasion类。编写 StaticBuffer 类是为了执行主要功能。我有一些问题:
我将非常感谢您对我的问题的详细回答。非常感谢))
我会简化很多。
您“使用”
enable_shared_from_this
,但没有任何异步操作捕获shared_from_this
。事实上,您甚至没有分配 UDPSession
共享,因此使用 shared_from_this
根本就是未定义行为。
隐含了无操作析构函数。如果您必须声明它们,
= default
它们
m_rcv_strand
已弃用 - 使用 strand<>
代替
为什么有单独的发送/接收链?当然,允许 1 个读取操作与 1 个写入操作重叠,但是如果没有适当的同步,您仍然无法访问共享对象(如
m_socket
)
您有股,但似乎错误地没有在相关的地方发布到它们(例如
post(m_ctx, bind_executor(m_send_strand, ....))
是冲突的)
您有一个费力的缓冲区类型,/似乎/旨在避免分配,但无论如何您都将其包装在 unique_ptr 中 ́\(ツ)/́
set_read_data_handler
不需要是模板。既然你无论如何都要擦除到 std::function
,那么与仅使用相比,好处为零:
void set_read_data_handler(ExternalReadHandler handler) {
external_rcv_handler = std::move(handler);
}
您有重复的魔法常数(例如
65000
)
您似乎缺少一个套接字
bind()
调用
简而言之,我会用一些合理的东西替换缓冲区:
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
由于您似乎期望使用文本协议,因此您的平均消息可能会(小得多)小,所以我认为仅使用
甚至std::string
可能会更快。boost::container::small_vector<...>
并不是真正必需的,但为了优雅的、asio 标准的使用:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
查看简化版本 Live On Coliru
#include <boost/asio.hpp>
#include <boost/container/static_vector.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <list>
#include <thread>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
// not really required but to allow for elegant, asio-standard use:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
using ExternalReadHandler = std::function<void(StaticBuffer&&)>;
class UDPSession {
private:
using error_code = boost::system::error_code;
asio::any_io_executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
udp::socket m_socket{make_strand(m_ex)};
std::deque<StaticBuffer> m_dq_send;
StaticBuffer m_rcv_data;
ExternalReadHandler external_rcv_handler;
public:
UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(StaticBuffer data) {
asio::post(m_socket.get_executor(), [this, d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
m_rcv_data.assign(m_rcv_data.static_capacity, '\0');
m_socket.async_receive_from(
buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, this, std::placeholders::_1, std::placeholders::_2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
m_rcv_data.resize(bytes_transferred);
asio::post(m_ex, [this, data = std::move(m_rcv_data)]() mutable {
external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to(buffer(m_dq_send.front()), m_endpoint,
[this](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(StaticBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
asio::io_context ctx;
auto work_guard = asio::make_work_guard(ctx);
trace("Main thread");
std::list<std::thread> threads;
for (int i = 0; i < 3; ++i)
threads.emplace_back([&]() {
trace("START");
ctx.run();
trace("END");
});
UDPSession client(ctx.get_executor(), "127.0.0.1", 11223);
client.set_read_data_handler(handler_read);
client.start();
client.send({'A', 'B', 'C', '\n'});
work_guard.reset();
for (auto& t : threads)
t.join();
}
Coliru 上的现场演示“吃掉”了
main.cpp
中的单词。这是本地词典演示:
shared_from_this
您可能已经注意到我改为
any_io_executor
而不是io_context&
。这样您就可以轻松切换到asio::thread_pool
,而不是手动执行(效果很差)。
我们也重新设置
shared_from_this
,但这次是正确的。
为了简单起见,我仅使用静态缓冲区作为接收缓冲区(因为这就是数据报协议的滚动方式),并且仅使用
vector
(或 small_vector
)作为 DataBuffer
。
#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;
class UDPSession : public std::enable_shared_from_this<UDPSession> {
private:
using error_code = boost::system::error_code;
asio::any_io_executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
udp::socket m_socket{make_strand(m_ex)};
std::deque<DataBuffer> m_dq_send;
std::array<uint8_t, 65000> m_rcv_data;
ExternalReadHandler external_rcv_handler;
public:
UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(DataBuffer data) {
asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
using namespace std::placeholders;
m_socket.async_receive_from( //
asio::buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
asio::post(
m_ex,
[this, self=shared_from_this(),
data = DataBuffer(m_rcv_data.data(), m_rcv_data.data() + bytes_transferred)]() mutable {
external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to( //
asio::buffer(m_dq_send.front()), m_endpoint,
[this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(DataBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
trace("Main thread");
asio::thread_pool ctx(4);
{
auto client = std::make_shared<UDPSession>(ctx.get_executor(), "127.0.0.1", 11223);
client->set_read_data_handler(handler_read);
client->start();
client->send({'A', 'B', 'C', '\n'});
} // client stays alive through shared ownership
ctx.join();
}
作为锦上添花,您可以在具体的执行器类型上对整个事物进行模板化,并避免类型擦除执行器类型:
template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
using socket_t = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
看到它在Coliru上直播
#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;
template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
using socket_t = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
using error_code = boost::system::error_code;
Executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
socket_t m_socket{make_strand(m_ex)};
std::deque<DataBuffer> m_dq_send;
std::array<uint8_t, 65000> m_rcv_data;
ExternalReadHandler external_rcv_handler;
using std::enable_shared_from_this<UDPSession>::shared_from_this;
public:
UDPSession(Executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(DataBuffer data) {
asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
using namespace std::placeholders;
m_socket.async_receive_from( //
asio::buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
auto f = m_rcv_data.data(), l = f + bytes_transferred;
asio::post(m_ex, [self = shared_from_this(), data = DataBuffer(f, l)]() mutable {
self->external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to( //
asio::buffer(m_dq_send.front()), m_endpoint,
[this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(DataBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
trace("Main thread");
using Ex = asio::thread_pool::executor_type;
asio::thread_pool ctx(4);
{
auto client = std::make_shared<UDPSession<Ex> >(ctx.get_executor(), "127.0.0.1", 11223);
client->set_read_data_handler(handler_read);
client->start();
client->send({'A', 'B', 'C', '\n'});
} // client stays alive through shared ownership
ctx.join();
}
另一个本地演示:
¹ 您至少需要在运行器线程中进行异常处理:是否应该捕获 boost::asio::io_service::run() 抛出的异常?