boost::asio async_write 在 googletest 中交错

问题描述 投票:0回答:1

我一直在尝试使用

boost::asio
编写一个tcp服务器,该服务器将向任意数量的连接客户端发送数据,我一直在尝试使用google test编写一些测试。

我在下面的测试中遇到了一些麻烦,它旨在测试交错,它是通过每个缓冲区填充 5 个缓冲区来实现的。然后,当我阅读时,我使用地图来检查每个值的计数。

我知道

async_write
会在后台调用
async_write_some
,因此建议避免交错股,因为它们保证在同一股上进行顺序操作。

这似乎没有发生,我不确定为什么?如有任何帮助,我们将不胜感激。

#include <boost/asio.hpp>
#include <gtest/gtest.h>
using tcp = boost::asio::ip::tcp;

// tcp_session.h
using boost_tcp = boost::asio::ip::tcp;

class tcp_session : public std::enable_shared_from_this<tcp_session> {
  public:
    explicit tcp_session(boost_tcp::socket socket);
    ~tcp_session();

    void   async_write(char const* data, size_t size);
    void   async_write(boost::asio::const_buffer buff);
    size_t write(char const* data, size_t size);
    size_t write(boost::asio::const_buffer buff);

    void close();

  private:
    boost_tcp::socket                                     m_socket;
    std::mutex                                            m_socket_mutex{};
    boost::asio::strand<boost_tcp::socket::executor_type> m_strand;

    void handle_async_write(boost::system::error_code const& err, size_t bytes_transferred);
};

// tcp_svr.h
class tcp_svr {
  public:
    explicit tcp_svr(int16_t port);
    ~tcp_svr();

    void async_write(char const* data, size_t size);
    void async_write(boost::asio::const_buffer buff);
    void run();
    void stop();

    size_t              get_session_count();
    boost_tcp::endpoint get_local_endpoint() { return m_acceptor.local_endpoint(); }

  private:
    using session_ptr = std::shared_ptr<tcp_session>;
    void                     do_accept();
    std::atomic_bool         m_stopped;
    std::vector<session_ptr> m_sessions{};
    std::mutex               m_sessions_mutex{};
    boost::asio::io_service  m_io_service{};
    boost_tcp::acceptor      m_acceptor;
};
// tcp_svr.cpp

tcp_svr::tcp_svr(int16_t port) : m_acceptor(m_io_service, tcp::endpoint(tcp::v4(), port)) { do_accept(); }

tcp_svr::~tcp_svr() { stop(); }

void tcp_svr::async_write(boost::asio::const_buffer buff) {
    std::lock_guard lock(m_sessions_mutex);
    for (auto const& client : m_sessions) {
        client->async_write(buff);
    }
}

void tcp_svr::run() { m_io_service.run(); }

void tcp_svr::stop() {
    if (m_stopped)
        return;

    m_stopped   = true;
    auto thread = std::thread{[this]() { m_acceptor.cancel(); }};
    thread.join();
    {
        std::lock_guard lock(m_sessions_mutex);
        for (auto const& client : m_sessions) {
            client->close();
        }

        m_sessions.clear();
    }

    m_io_service.stop();
}

size_t tcp_svr::get_session_count() {
    std::lock_guard lock(m_sessions_mutex);
    return m_sessions.size();
}

void tcp_svr::do_accept() {
    m_acceptor.async_accept(boost::asio::make_strand(m_io_service),
                            [this](boost::system::error_code err, boost_tcp::socket socket) {
                                if (m_stopped) {
                                    return;
                                }

                                if (!err) {
                                    std::lock_guard lock(m_sessions_mutex);
                                    m_sessions.push_back(std::make_shared<tcp_session>(std::move(socket)));
                                }

                                do_accept();
                            });
}

// tcp_session.cpp
tcp_session::tcp_session(boost_tcp::socket socket)
    : m_socket(std::move(socket))
    , m_strand(socket.get_executor()) {}

tcp_session::~tcp_session() { close(); }

void tcp_session::async_write(boost::asio::const_buffer buff) {
    boost::asio::post(m_strand, [this, buff = buff]() {
        std::lock_guard lock(m_socket_mutex);
        boost::asio::async_write(m_socket, buff,
                                 [this, self = shared_from_this()](const boost::system::error_code& ec,
                                                                   std::size_t bytes_transferred) {
                                     handle_async_write(ec, bytes_transferred);
                                 });
    });
}

void tcp_session::async_write(char const* data, size_t size) { async_write(boost::asio::buffer(data, size)); }

void tcp_session::close() {
    std::lock_guard lock(m_socket_mutex);
    if (m_socket.is_open()) {
        m_socket.close();
    }
}

void tcp_session::handle_async_write(boost::system::error_code const& err,
                                     [[maybe_unused]] size_t          bytes_transferred) {
    if (err) {
        // log
    }
}

class TcpFixture : public testing::Test {
  protected:
    boost::asio::io_service  m_client_io_service{};
    std::unique_ptr<tcp_svr> m_server;
    std::thread              m_client_io_service_thread;
    std::thread              m_server_thread;

    void SetUp() override {
        int16_t port               = 1234;
        m_server                   = std::make_unique<tcp_svr>(port);
        m_client_io_service_thread = std::thread([&]() { m_client_io_service.run(); });
        m_server_thread            = std::thread([&]() { m_server->run(); });
    }

    void TearDown() override {
        m_client_io_service.stop();
        m_client_io_service_thread.join();
        m_server->stop();
        m_server_thread.join();
    }
};

TEST_F(TcpFixture, NoInterleavingAsyncWrite) {
    tcp::socket client_socket(m_client_io_service);
    client_socket.connect(m_server->get_local_endpoint());

    static constexpr int    kMessageCount = 5;
    static constexpr size_t kDataSize     = 65536;

    std::array<std::array<int, kDataSize>, kMessageCount> data{};
    for (int i = 0; i < kMessageCount; i++) {
        std::fill(data.at(i).begin(), data.at(i).end(), i + 1);
    }

    while (m_server->get_session_count() != 1) {
    }

    for (auto const& arr : data) {
        m_server->async_write(boost::asio::buffer(arr));
    }

    for (int i = 0; i < kMessageCount; i++) {
        std::array<int, kDataSize> read_buffer{};
        boost::asio::read(client_socket, boost::asio::buffer(read_buffer));

        std::map<int, int> value_count{};
        for (int val : read_buffer) {
            value_count[val]++;
        }

        EXPECT_EQ(value_count.size(), 1);
    }
}

打印例如

Running main() from ./googletest/src/gtest_main.cc
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from TcpFixture
[ RUN      ] TcpFixture.NoInterleavingAsyncWrite
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
  value_count.size()
    Which is: 4
  1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
  value_count.size()
    Which is: 4
  1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
  value_count.size()
    Which is: 4
  1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
  value_count.size()
    Which is: 4
  1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
  value_count.size()
    Which is: 4
  1
[  FAILED  ] TcpFixture.NoInterleavingAsyncWrite (102 ms)
[----------] 1 test from TcpFixture (102 ms total)

[----------] Global test environment tear-down
[==========] 1 test from 1 test suite ran. (102 ms total)
[  PASSED  ] 0 tests.
[  FAILED  ] 1 test, listed below:
[  FAILED  ] TcpFixture.NoInterleavingAsyncWrite

 1 FAILED TEST
c++ boost-asio googletest
1个回答
0
投票

您对股线感到困惑。

您不需要互斥体链。当您将链执行器与接受套接字关联时,您不需要另一个链。只需使用

m_socket.get_executor()

此外,链确实会阻止(中间)处理程序的并发执行。它们不会阻止重叠异步操作的并发启动。只是它们不会同时启动。 ́\(ツ)

此外,您的孔操作是单线程的,因此股线相当多余。

如果要避免交错异步组合写入操作,则必须序列化操作,例如通过排队。

此外,还存在会话生命周期问题(会话永远不会消失,因为您的 svr 保留共享所有权)。更严重的是,异步链发布操作无法捕获

shared_from_this

您也没有在客户端处理任何错误(包括 EOF)。

我冒昧地解决了上述所有问题和一些不相关的改进,这是完全按预期工作的测试用例:

住在科里鲁

#include <boost/asio.hpp>
#include <deque>
#include <list>

namespace Tcp {
    namespace asio = boost::asio;
    using tcp      = asio::ip::tcp;
    using boost::system::error_code;

    struct Session : std::enable_shared_from_this<Session> {
        Session(tcp::socket socket) : m_socket(std::move(socket)) {}

        void start() {
            // keep a running read so we can detect errors and keep the session alive without the server
            // owning the lifetime
            post(m_socket.get_executor(), std::bind(&Session::do_read_loop, shared_from_this()));
        }

        void send(asio::const_buffer buff) {
            post(m_socket.get_executor(), [this, self = shared_from_this(), buff]() {
                outbox_.push_back(std::move(buff));
                if (outbox_.size() == 1)
                    do_write_loop();
            });
        }

        void close() {
            post(m_socket.get_executor(), std::bind(&Session::do_close, shared_from_this()));
        }

      private:
        tcp::socket                    m_socket;
        std::array<char, 16>           incoming_;
        std::deque<asio::const_buffer> outbox_;

        void do_read_loop() { // on strand
            m_socket.async_read_some(asio::buffer(incoming_),
                                     [this, self = shared_from_this()](error_code ec, size_t) {
                                         if (!ec)
                                             do_read_loop();
                                     });
        }

        void do_write_loop() { // on strand
            if (outbox_.empty())
                return;

            async_write(m_socket, outbox_.front(), //
                        [this, self = shared_from_this()](error_code ec, [[maybe_unused]] size_t n) {
                            if (!ec) {
                                outbox_.pop_front();
                                do_write_loop();
                            }
                        });
        }

        void do_close() { // on strand
            if (m_socket.is_open())
                m_socket.close();
        }
    };

    class server {
      public:
        server(asio::any_io_executor ex, uint16_t port) //
            : ex_(ex)
            , acc_(make_strand(ex), {{}, port}) {
            do_accept();
        }

        ~server() { stop(); }

        void send(asio::const_buffer buff) {
            for (auto& handle : sessions_) {
                if (auto sess = handle.lock())
                    sess->send(buff);
            }
        }

        void stop() {
            post(acc_.get_executor(), [this] {
                for (auto& handle : sessions_) {
                    if (auto sess = handle.lock())
                        sess->close();
                }

                sessions_.clear();
                acc_.cancel();
            });
        }

        size_t get_session_count() {
            return asio::post(              //
                       acc_.get_executor(), //
                       asio::use_future([this] { return sessions_.size(); }))
                .get();
        }
        tcp::endpoint local_endpoint() { return acc_.local_endpoint(); }

      private:
        using session_ptr = std::shared_ptr<Session>;
        using handle      = std::weak_ptr<Session>;
        void do_accept() {
            acc_.async_accept(asio::make_strand(ex_), [this](error_code err, tcp::socket socket) {
                if (!err) {
                    do_accept();

                    auto sess = std::make_shared<Session>(std::move(socket));
                    sessions_.push_back(sess);
                    sess->start();
                }
            });
        }

        asio::any_io_executor ex_;
        tcp::acceptor         acc_;
        std::list<handle>     sessions_{};
    };

} // namespace Tcp

#include <gtest/gtest.h>

class TcpFixture : public testing::Test {
  protected:
    boost::asio::thread_pool ioc_{1};
    uint16_t                 port_ = 1234;
    Tcp::server              svr_{ioc_.get_executor(), port_};

    void SetUp() override {}

    void TearDown() override {
        svr_.stop();
        ioc_.join(); // optionally wait
    }
};

TEST_F(TcpFixture, NoInterleavingAsyncWrite) {
    boost::asio::ip::tcp::socket client_socket(ioc_);
    client_socket.connect(svr_.local_endpoint());

    constexpr size_t LEN = 65536;
    using Message        = std::array<int, LEN>;
    std::vector<Message> data(5);

    for (int i = 0; auto& msg : data)
        std::fill(msg.begin(), msg.end(), ++i);

    for (auto const& msg : data)
        svr_.send(boost::asio::buffer(msg));

    size_t total_received = 0;
    for ([[maybe_unused]] auto&& _ : data) {
        Message msg;

        boost::system::error_code ec;
        auto n = read(client_socket, boost::asio::buffer(msg), ec);

        if (ec == boost::asio::error::eof) {
            if (n == 0) // error condition without (partial) read success
                break;
        } else {
            EXPECT_EQ(n, sizeof(Message));
            EXPECT_FALSE(ec.failed());
        }

        if (n) {
            ++total_received;
            EXPECT_EQ(n, sizeof(Message));
            EXPECT_EQ(msg.size(), std::count(msg.begin(), msg.end(), msg.front()));
        }
    }

    EXPECT_EQ(data.size(), total_received);
}

现场演示:

© www.soinside.com 2019 - 2024. All rights reserved.