Boost Asio 异步客户端

问题描述 投票:0回答:1

我正在尝试使用 Boost asio 编写一个异步客户端, 我写了以下代码,

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/beast/http.hpp>
#include <boost/json/src.hpp>
namespace json = boost::json;
namespace asio = boost::asio;
namespace this_coro = asio::this_coro;
using asio::ip::tcp;
using boost::system::error_code;
using namespace asio::experimental::awaitable_operators;

#include <iostream>
using namespace std;




template <typename T> using Defer = asio::deferred_t::as_default_on_t<T>;
using Socket = Defer<tcp::socket>;
using Acceptor = Defer<tcp::acceptor>;



using boost::asio::ip::tcp;

class TCPClient {
public:
    TCPClient(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator)
        : io_service_(io_service), socket_(io_service) {
        connect(endpoint_iterator);
    }

    void write(const std::string& message) {
        io_service_.post(boost::bind(&TCPClient::do_write, this, message));
    }

    void close() {
        io_service_.post(boost::bind(&TCPClient::do_close, this));
    }

private:
    void connect(tcp::resolver::iterator endpoint_iterator) {
        boost::asio::async_connect(socket_, endpoint_iterator,
            boost::bind(&TCPClient::handle_connect, this,
                boost::asio::placeholders::error));
    }

    void handle_connect(const boost::system::error_code& error) {
        if (!error) {
            cout << "connected" << endl;
            write("Hello, World!");
            read();
        }
        else {
            cout << "connection failed" << endl;
        }
    }

    void do_write(const std::string& message) {
        boost::asio::async_write(socket_,
            boost::asio::buffer(message),
            boost::bind(&TCPClient::handle_write, this,
                boost::asio::placeholders::error));
    }

    void handle_write(const boost::system::error_code& error) {
        if (!error) {
            cout << "write successfully" << endl;
        }
        else {
            cout << "write failed " << error << endl;
        }
    }

    void read() {
        socket_.async_read_some(boost::asio::buffer(data_, max_length),
            boost::bind(&TCPClient::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));

        //socket_.async_read_some(buf.prepare(1024),
        //    boost::bind(&TCPClient::handle_read, this,
        //        boost::asio::placeholders::error,
        //        boost::asio::placeholders::bytes_transferred));
    }

    void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
        if (!error) {
            std::cout << "Received: " << bytes_transferred << std::endl;
            read();
        }
        else {
            // Handle the error
            std::cout << "read failed " << error << std::endl;
        }
    }

    void do_close() {
        socket_.close();
    }

    boost::asio::io_service& io_service_;
    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length] = { 0 };
    boost::asio::streambuf buf;
};

int main() {
    boost::asio::io_service io_service;
    tcp::resolver resolver(io_service);
    tcp::resolver::iterator endpoint_iterator = resolver.resolve("localhost", "8989");
    TCPClient client(io_service, endpoint_iterator);
    io_service.run();

    while (1) {}

    return 0;
}

客户端可以成功连接,并且正在调用处理程序。使用wireshark,我还可以看到 write() 函数发送“Hello, World”消息并收到回复,但写入或读取处理程序都没有被调用。

我尝试了不同的东西,看到了不同的示例代码,但我无法弄清楚。还尝试使用 coroutine 编写相同的代码,代码可以在下面找到,但是,我是异步套接字和 Boost Asio 的新手,我不确定我还在做什么(但正在努力实现) ,

#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>

using boost::asio::ip::tcp;
namespace asio = boost::asio;
using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;

awaitable<void> connectAndCommunicate() {
    try {
        asio::io_context io_context;
        tcp::socket socket(io_context);

        co_await socket.async_connect(tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 12345), use_awaitable);

        std::string message = "Hello, World!";
        co_await asio::async_write(socket, asio::buffer(message), use_awaitable);

        char data[128];
        std::size_t length = co_await socket.async_read_some(asio::buffer(data), use_awaitable);
        std::cout << "Received: " << std::string(data, length) << std::endl;

    } catch (const std::exception& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
}

int main() {
    asio::io_context io_context;
    co_spawn(io_context, connectAndCommunicate(), detached);
    io_context.run();
    return 0;
}

我希望更有经验的开发人员指出我的错误以及如何最好地做我想做的事情。

谢谢

c++ asynchronous boost c++20 boost-asio
1个回答
0
投票

第一版回顾

  1. 不要使用using命名空间
  2. 不要使用 boost::bind,当然也不要使用过时的
    boost/bind.hpp
    include
  3. 不要包含大量不需要的标头,删除未使用的类型别名
  4. 使用您拥有的缩写命名空间
    • 使用
      io_context
      代替已弃用的
      io_service
    • 不要传递对 io 上下文的引用;而是使用执行器
  5. 建议不要泄漏接口中的实现细节(将主机/端口作为构造函数参数,而不是某种特定类型的迭代器)
  6. 在链上时,不需要
    post
    ,因此在
    handle_connect
    中执行
    do_write()
    ,而不是
    write()

现在我们的线路从 120 条减少到 69 条:Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace std::placeholders;
using asio::ip::tcp;
using boost::system::error_code;

class TCPClient {
  public:
    TCPClient(asio::io_context& ioc, std::string const& host, std::string const& port) : socket_(ioc) {
        tcp::resolver resolver(ioc);
        connect(resolver.resolve(host, port));
    }

    void write(std::string const& message) {
        post(socket_.get_executor(), std::bind(&TCPClient::do_write, this, message));
    }

    void close() { post(socket_.get_executor(), std::bind(&TCPClient::do_close, this)); }

  private:
    void connect(tcp::resolver::iterator endpoint_iterator) {
        async_connect(socket_, endpoint_iterator, std::bind(&TCPClient::handle_connect, this, _1));
    }

    void handle_connect(error_code ec) {
        if (!ec) {
            std::cout << "connected" << std::endl;
            do_write("Hello, World!");
            read();
        } else {
            std::cout << "connection failed (" << ec.message() << ")" << std::endl;
        }
    }

    void do_write(std::string const& message) {
        async_write(socket_, asio::buffer(message), std::bind(&TCPClient::handle_write, this, _1));
    }

    void handle_write(error_code ec) { std::cout << "write " << ec.message() << std::endl; }

    void read() {
        socket_.async_read_some(asio::buffer(data_), std::bind(&TCPClient::handle_read, this, _1, _2));

        // socket_.async_read_some(buf.prepare(1024), std::bind(&TCPClient::handle_read, this, _1, _2));
    }

    void handle_read(error_code ec, size_t bytes_transferred) {
        std::cout << "Read " << ec.message() << " (" << bytes_transferred << ")" << std::endl;
        if (!ec) {
            std::cout << "Received: " << bytes_transferred << std::endl;
            read();
        } else {
            // Handle the error
        }
    }

    void do_close() { socket_.close(); }

    tcp::socket            socket_;
    std::array<char, 1024> data_{0};
    // asio::streambuf     buf;
};

int main() {
    asio::io_context ioc;
    TCPClient        client(ioc, "localhost", "8989");
    ioc.run();
}

我可能无意中解决了一个我没有有意识提到的问题,但对我来说,它看起来只是有效(TM):

第二版审核

已经干净很多了!我建议

  • 删除额外的
    io_context
    ,你在一个协程中,根据定义有一个上下文。从
    this_coro
  • 获取执行者
  • 使用
    deferred
    而不是
    use_awaitable
    来提高效率
  • 不使用原始 C 数组

为了更接近第一个列表:

  • 使用相同的缓冲区大小
  • 还获取主机/端口信息并像第一个清单中那样解决它
  • 循环读取

我们得到:在 Coliru 上生活

#include <boost/asio.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <iostream>

using boost::asio::ip::tcp;
namespace asio = boost::asio;

asio::awaitable<void> connectAndCommunicate(std::string host, std::string port) {
    try {
        auto ex = co_await asio::this_coro::executor;
        auto token = asio::deferred;

        tcp::socket s(ex);
        tcp::resolver resolver(ex);
        co_await async_connect(s, co_await resolver.async_resolve(host, port, token), token);

        std::string message = "Hello, World!";
        co_await asio::async_write(s, asio::buffer(message), token);

        for (;;) {
            std::array<char, 1024> buf;
            auto length = co_await s.async_read_some(asio::buffer(buf), token);
            std::cout << "Received: " << quoted(std::string_view(buf.data(), length)) << std::endl;
        }
    } catch (boost::system::system_error const& se) {
        std::cerr << "Exception: " << se.code().message() << std::endl;
    }
}

int main() {
    asio::io_context ioc;
    co_spawn(ioc, connectAndCommunicate("127.0.0.1", "8989"), asio::detached);
    ioc.run();
}

  • 我会考虑将
    deferred
    设置为默认执行器。讽刺的是,第一个代码清单定义了那些甚至不相关的类型别名:) Live On Coliru

未解之谜

事情被掩盖了,因为它们不是你问题的一部分:

  • 链和输出消息队列(现在无论如何你都没有多个输出消息)
  • 消息框架。如果您需要解释消息,
    async_read_some
    是一个糟糕的方法。您将收到部分/串联的消息。这就是组合操作和动态缓冲区存在的原因。
  • 错误处理。我稍微改进了它,但你需要意识到部分成功(特别是组合读取,但也许
    async_read_some
    可以返回字节 EOF 本身)
© www.soinside.com 2019 - 2024. All rights reserved.