我正在尝试学习boost asio(boost 1.84,C ++ 20,Ubuntu 23.04),并对以下日间服务器示例进行了一些修改:https://www.boost.org/doc/libs/1_84_0/ doc/html/boost_asio/tutorial/tutdaytime3/src.html
我让 make_daytime_string() 休眠 5 秒来模拟繁重的工作,然后通过 std::async 外包。然后我在 std::async 的末尾发布一个 write_async ,以便稍后在与其他完成处理程序相同的线程中执行 write_async 。
我对我的手术有 2 个疑问:
1.) 是否有必要发表文章,或者是否允许直接从 std::async 调用 async_write?这里有什么优点/缺点吗?
2.) 我的变体有一个问题,即不再调用 tcp_connection dtor,并且客户端不再接收 boost::asio::error::eof。数据以 25 字节正确传输到客户端。仅当我终止服务器时,客户端才会关闭。我怀疑我的shared_ptr做错了什么?
这是改编后的代码:
#include <ctime>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
std::future< void > futureSink;
std::string make_daytime_string()
{
using namespace std;
time_t now = time( 0 );
std::this_thread::sleep_for( std::chrono::milliseconds( 5000 ) );
return ctime( &now );
}
class tcp_connection : public std::enable_shared_from_this< tcp_connection >
{
public:
typedef std::shared_ptr< tcp_connection > pointer;
static pointer create(boost::asio::io_context& io_context)
{
return pointer( new tcp_connection( io_context ) );
}
tcp::socket& socket()
{
return socket_;
}
void start()
{
auto self = shared_from_this();
futureSink = std::async( std::launch::async,
[ self, this ]()
{
message_ = make_daytime_string();
boost::asio::post( socket_.get_executor(),
[ self ]()
{
boost::asio::async_write( self->socket_,
boost::asio::buffer( self->message_ ),
std::bind( &tcp_connection::handle_write, self,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
});
});
}
private:
tcp_connection( boost::asio::io_context& io_context ) : socket_( io_context )
{}
void handle_write(const boost::system::error_code& /*error*/, size_t /*bytes_transferred*/)
{}
tcp::socket socket_;
std::string message_;
};
class tcp_server
{
public:
tcp_server( boost::asio::io_context& io_context ) : io_context_( io_context ),
acceptor_( io_context, tcp::endpoint( tcp::v4(), 13 ) )
{
start_accept();
}
private:
void start_accept()
{
tcp_connection::pointer new_connection = tcp_connection::create( io_context_ );
acceptor_.async_accept( new_connection->socket(),
std::bind( &tcp_server::handle_accept, this, new_connection,
boost::asio::placeholders::error ) );
}
void handle_accept( tcp_connection::pointer new_connection, const boost::system::error_code& error )
{
if ( !error )
new_connection->start();
start_accept();
}
boost::asio::io_context& io_context_;
tcp::acceptor acceptor_;
};
int main()
{
try
{
boost::asio::io_context io_context;
tcp_server server( io_context );
io_context.run();
}
catch ( std::exception& e )
{
std::cerr << e.what() << std::endl;
}
return 0;
}
我找到了问题2的答案。这里捕获的shared_ptr没有被清理,因为我已经将std::future移到了全局变量(fire&forget)。补救措施是按如下方式移动shared_ptr:
void start()
{
auto self = shared_from_this();
futureSink = std::async( std::launch::async,
[ self, this ]() mutable
{
message_ = make_daytime_string();
boost::asio::post( socket_.get_executor(),
[ self = std::move( self ) ]()
{
boost::asio::async_write( self->socket_,
boost::asio::buffer( self->message_ ),
std::bind( &tcp_connection::handle_write, self,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
});
});
}
是的,这个帖子是必要的。无论
std::async
在何处运行 lambda,根据定义,它都不在服务线程(这是隐式链)上。
确实是这样。如果您使用
futureSink
,您实际上会看到析构函数再次运行:http://coliru.stacked-crooked.com/a/47d92eea20083e0f
哪里可以修复?
首先,
std::async
不是异步的。严格来说,这是一个不太可靠的std::thread
(充其量)。
你的未来是全局的,它不能很好地处理并发连接。
因为你真的只想在非服务线程上集中工作(这样你就不会阻塞 IO),所以我真的只是/这样做/。
#include <boost/asio.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using asio::ip::tcp;
static asio::thread_pool pool(4); // how about some control over our threads
static std::string simulate_work() {
std::this_thread::sleep_for(5s);
return "Work simulation done";
}
class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
public:
using pointer = std::shared_ptr<tcp_connection>;
static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }
tcp::socket& socket() { return socket_; }
void start() {
std::cerr << __PRETTY_FUNCTION__ << std::endl;
auto self = shared_from_this();
post(pool, [self, this]() {
auto m = simulate_work();
post(socket_.get_executor(),
[self, m = std::move(m)]() mutable { self->on_work_completed(std::move(m)); });
});
}
~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }
private:
tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}
void on_work_completed(std::string message) { // already on executor
message_ = std::move(message);
async_write(socket_, asio::buffer(message_),
bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
}
void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
}
tcp::socket socket_;
std::string message_;
};
class tcp_server {
public:
tcp_server(asio::io_context& io_context)
: io_context_(io_context)
, acceptor_(io_context, {tcp::v4(), 1313}) {
start_accept();
}
private:
void start_accept() {
tcp_connection::pointer new_connection = tcp_connection::create(io_context_);
acceptor_.async_accept(new_connection->socket(),
bind(&tcp_server::handle_accept, this, new_connection, _1));
}
void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
if (!error)
new_connection->start();
start_accept();
}
asio::io_context& io_context_;
tcp::acceptor acceptor_;
};
int main() {
try {
boost::asio::io_context io_context;
tcp_server server(io_context);
io_context.run();
pool.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}
如果你想要更花哨一点,你可以提供一个 Asio 风格的启动函数
async_simulate_work
:
#include <boost/asio.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using asio::ip::tcp;
static std::string simulate_work() {
std::this_thread::sleep_for(5s);
return "Work simulation done";
}
template <typename Token> auto async_simulate_work(Token&& token) {
return asio::async_initiate<Token, void(std::string)>( //
[](auto handler) {
std::thread( //
[h = std::move(handler)]() mutable { //
std::move(h)(simulate_work());
})
.detach();
},
token);
}
class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
public:
using pointer = std::shared_ptr<tcp_connection>;
static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }
tcp::socket& socket() { return socket_; }
void start() {
std::cerr << __PRETTY_FUNCTION__ << std::endl;
auto self = shared_from_this();
async_simulate_work(bind_executor(socket_.get_executor(), [self, this](std::string message) {
message_ = std::move(message);
async_write(socket_, asio::buffer(message_),
bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
}));
}
~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }
private:
tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}
void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
assert(socket_.get_executor().target<asio::io_context::executor_type>()->running_in_this_thread());
std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
}
tcp::socket socket_;
std::string message_;
};
class tcp_server {
public:
tcp_server(asio::io_context& io_context)
: io_context_(io_context)
, acceptor_(io_context, {tcp::v4(), 1313}) {
start_accept();
}
private:
void start_accept() {
tcp_connection::pointer new_connection = tcp_connection::create(io_context_);
acceptor_.async_accept(new_connection->socket(),
bind(&tcp_server::handle_accept, this, new_connection, _1));
}
void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
if (!error)
new_connection->start();
start_accept();
}
asio::io_context& io_context_;
tcp::acceptor acceptor_;
};
int main() {
try {
boost::asio::io_context io_context;
tcp_server server(io_context);
io_context.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}
另一个本地演示: