boost asio 日期时间服务器示例和使用 std::async 不起作用

问题描述 投票:0回答:1

我正在尝试学习boost asio(boost 1.84,C ++ 20,Ubuntu 23.04),并对以下日间服务器示例进行了一些修改:https://www.boost.org/doc/libs/1_84_0/ doc/html/boost_asio/tutorial/tutdaytime3/src.html

我让 make_daytime_string() 休眠 5 秒来模拟繁重的工作,然后通过 std::async 外包。然后我在 std::async 的末尾发布一个 write_async ,以便稍后在与其他完成处理程序相同的线程中执行 write_async 。

我对我的手术有 2 个疑问:

1.) 是否有必要发表文章,或者是否允许直接从 std::async 调用 async_write?这里有什么优点/缺点吗?

2.) 我的变体有一个问题,即不再调用 tcp_connection dtor,并且客户端不再接收 boost::asio::error::eof。数据以 25 字节正确传输到客户端。仅当我终止服务器时,客户端才会关闭。我怀疑我的shared_ptr做错了什么?

这是改编后的代码:

    #include <ctime>
    #include <functional>
    #include <iostream>
    #include <memory>
    #include <string>
    #include <boost/asio.hpp>

    using boost::asio::ip::tcp;

    std::future< void > futureSink;

    std::string make_daytime_string()
    {
        using namespace std;
        time_t now = time( 0 );
        std::this_thread::sleep_for( std::chrono::milliseconds( 5000 ) );
        return ctime( &now );
    }

    class tcp_connection : public std::enable_shared_from_this< tcp_connection >
    {
    public:
        typedef std::shared_ptr< tcp_connection > pointer;

        static pointer create(boost::asio::io_context& io_context)
        {
            return pointer( new tcp_connection( io_context ) );
        }

        tcp::socket& socket()
        {
            return socket_;
        }

        void start()
        {
            auto self = shared_from_this();

            futureSink = std::async( std::launch::async,
                [ self, this ]()
                {
                    message_ = make_daytime_string();

                    boost::asio::post( socket_.get_executor(),
                        [ self ]()
                        {
                            boost::asio::async_write( self->socket_,
                                                      boost::asio::buffer( self->message_ ),
                                                      std::bind( &tcp_connection::handle_write, self,
                                                                 boost::asio::placeholders::error,
                                                                 boost::asio::placeholders::bytes_transferred ) );
                        });
                });
        }
    private:
        tcp_connection( boost::asio::io_context& io_context ) : socket_( io_context )
        {}

        void handle_write(const boost::system::error_code& /*error*/, size_t /*bytes_transferred*/)
        {}

        tcp::socket socket_;
        std::string message_;
    };

    class tcp_server
    {
    public:
        tcp_server( boost::asio::io_context& io_context ) : io_context_( io_context ),
            acceptor_( io_context, tcp::endpoint( tcp::v4(), 13 ) )
        {
            start_accept();
        }

    private:
        void start_accept()
        {
            tcp_connection::pointer new_connection = tcp_connection::create( io_context_ );

            acceptor_.async_accept( new_connection->socket(),
                                    std::bind( &tcp_server::handle_accept, this, new_connection,
                                               boost::asio::placeholders::error ) );
        }

        void handle_accept( tcp_connection::pointer new_connection, const boost::system::error_code& error )
        {
            if ( !error )
                new_connection->start();

            start_accept();
        }

        boost::asio::io_context& io_context_;
        tcp::acceptor acceptor_;
    };

    int main()
    {
        try
        {
            boost::asio::io_context io_context;
            tcp_server server( io_context );
            io_context.run();
        }
        catch ( std::exception& e )
        {
            std::cerr << e.what() << std::endl;
        }

        return 0;
    }

编辑2023年4月7日16:08

我找到了问题2的答案。这里捕获的shared_ptr没有被清理,因为我已经将std::future移到了全局变量(fire&forget)。补救措施是按如下方式移动shared_ptr:

void start()
{
    auto self = shared_from_this();

    futureSink = std::async( std::launch::async,
                            [ self, this ]() mutable
                            {
                                message_ = make_daytime_string();

                                boost::asio::post( socket_.get_executor(),
                                                  [ self = std::move( self ) ]()
                                                  {
                                                      boost::asio::async_write( self->socket_,
                                                                               boost::asio::buffer( self->message_ ),
                                                                               std::bind( &tcp_connection::handle_write, self,
                                                                                         boost::asio::placeholders::error,
                                                                                         boost::asio::placeholders::bytes_transferred ) );
                                                  });
                            });
} 
c++ asynchronous boost boost-asio shared-ptr
1个回答
0
投票
  1. 是的,这个帖子是必要的。无论

    std::async
    在何处运行 lambda,根据定义,它都不在服务线程(这是隐式链)上。

  2. 确实是这样。如果您使用

    futureSink
    ,您实际上会看到析构函数再次运行:http://coliru.stacked-crooked.com/a/47d92eea20083e0f


哪里可以修复?

首先,

std::async
不是异步的。严格来说,这是一个不太可靠的
std::thread
(充其量)。

你的未来是全局的,它不能很好地处理并发连接。

因为你真的只想在非服务线程上集中工作(这样你就不会阻塞 IO),所以我真的只是/这样做/。

住在Coliru

#include <boost/asio.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <string>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using asio::ip::tcp;

static asio::thread_pool pool(4); // how about some control over our threads

static std::string simulate_work() {
    std::this_thread::sleep_for(5s);
    return "Work simulation done";
}

class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
  public:
    using pointer = std::shared_ptr<tcp_connection>;

    static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }

    tcp::socket& socket() { return socket_; }

    void start() {
        std::cerr << __PRETTY_FUNCTION__ << std::endl;
        auto self = shared_from_this();

        post(pool, [self, this]() {
            auto m = simulate_work();
            post(socket_.get_executor(),
                 [self, m = std::move(m)]() mutable { self->on_work_completed(std::move(m)); });
        });
    }

    ~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }

  private:
    tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}

    void on_work_completed(std::string message) { // already on executor
        message_ = std::move(message);
        async_write(socket_, asio::buffer(message_),
                    bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
    }

    void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
        std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
    }

    tcp::socket socket_;
    std::string message_;
};

class tcp_server {
  public:
    tcp_server(asio::io_context& io_context)
        : io_context_(io_context)
        , acceptor_(io_context, {tcp::v4(), 1313}) {
        start_accept();
    }

  private:
    void start_accept() {
        tcp_connection::pointer new_connection = tcp_connection::create(io_context_);

        acceptor_.async_accept(new_connection->socket(),
                               bind(&tcp_server::handle_accept, this, new_connection, _1));
    }

    void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
        if (!error)
            new_connection->start();

        start_accept();
    }

    asio::io_context& io_context_;
    tcp::acceptor     acceptor_;
};

int main() {
    try {
        boost::asio::io_context io_context;

        tcp_server server(io_context);
        io_context.run();
        pool.join();
    } catch (std::exception const& e) {
        std::cerr << e.what() << std::endl;
    }
}

奖金/花哨

如果你想要更花哨一点,你可以提供一个 Asio 风格的启动函数

async_simulate_work
:

住在Coliru

#include <boost/asio.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <string>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using asio::ip::tcp;

static std::string simulate_work() {
    std::this_thread::sleep_for(5s);
    return "Work simulation done";
}

template <typename Token> auto async_simulate_work(Token&& token) {
    return asio::async_initiate<Token, void(std::string)>( //
        [](auto handler) {
            std::thread(                             //
                [h = std::move(handler)]() mutable { //
                    std::move(h)(simulate_work());
                })
                .detach();
        },
        token);
}

class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
  public:
    using pointer = std::shared_ptr<tcp_connection>;

    static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }

    tcp::socket& socket() { return socket_; }

    void start() {
        std::cerr << __PRETTY_FUNCTION__ << std::endl;
        auto self = shared_from_this();

        async_simulate_work(bind_executor(socket_.get_executor(), [self, this](std::string message) {
            message_ = std::move(message);
            async_write(socket_, asio::buffer(message_),
                        bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
        }));
    }

    ~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }

  private:
    tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}

    void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
        assert(socket_.get_executor().target<asio::io_context::executor_type>()->running_in_this_thread());
        std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
    }

    tcp::socket socket_;
    std::string message_;
};

class tcp_server {
  public:
    tcp_server(asio::io_context& io_context)
        : io_context_(io_context)
        , acceptor_(io_context, {tcp::v4(), 1313}) {
        start_accept();
    }

  private:
    void start_accept() {
        tcp_connection::pointer new_connection = tcp_connection::create(io_context_);

        acceptor_.async_accept(new_connection->socket(),
                               bind(&tcp_server::handle_accept, this, new_connection, _1));
    }

    void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
        if (!error)
            new_connection->start();

        start_accept();
    }

    asio::io_context& io_context_;
    tcp::acceptor     acceptor_;
};

int main() {
    try {
        boost::asio::io_context io_context;

        tcp_server server(io_context);
        io_context.run();
    } catch (std::exception const& e) {
        std::cerr << e.what() << std::endl;
    }
}

另一个本地演示:

© www.soinside.com 2019 - 2024. All rights reserved.