我一直在尝试使用
boost::asio
编写一个tcp服务器,该服务器将向任意数量的连接客户端发送数据,我一直在尝试使用google test编写一些测试。
我在下面的测试中遇到了一些麻烦,它旨在测试交错,它是通过每个缓冲区填充 5 个缓冲区来实现的。然后,当我阅读时,我使用地图来检查每个值的计数。
我知道
async_write
会在后台调用 async_write_some
,因此建议避免交错股,因为它们保证在同一股上进行顺序操作。
这似乎没有发生,我不确定为什么?如有任何帮助,我们将不胜感激。
#include <boost/asio.hpp>
#include <gtest/gtest.h>
using tcp = boost::asio::ip::tcp;
// tcp_session.h
using boost_tcp = boost::asio::ip::tcp;
class tcp_session : public std::enable_shared_from_this<tcp_session> {
public:
explicit tcp_session(boost_tcp::socket socket);
~tcp_session();
void async_write(char const* data, size_t size);
void async_write(boost::asio::const_buffer buff);
size_t write(char const* data, size_t size);
size_t write(boost::asio::const_buffer buff);
void close();
private:
boost_tcp::socket m_socket;
std::mutex m_socket_mutex{};
boost::asio::strand<boost_tcp::socket::executor_type> m_strand;
void handle_async_write(boost::system::error_code const& err, size_t bytes_transferred);
};
// tcp_svr.h
class tcp_svr {
public:
explicit tcp_svr(int16_t port);
~tcp_svr();
void async_write(char const* data, size_t size);
void async_write(boost::asio::const_buffer buff);
void run();
void stop();
size_t get_session_count();
boost_tcp::endpoint get_local_endpoint() { return m_acceptor.local_endpoint(); }
private:
using session_ptr = std::shared_ptr<tcp_session>;
void do_accept();
std::atomic_bool m_stopped;
std::vector<session_ptr> m_sessions{};
std::mutex m_sessions_mutex{};
boost::asio::io_service m_io_service{};
boost_tcp::acceptor m_acceptor;
};
// tcp_svr.cpp
tcp_svr::tcp_svr(int16_t port) : m_acceptor(m_io_service, tcp::endpoint(tcp::v4(), port)) { do_accept(); }
tcp_svr::~tcp_svr() { stop(); }
void tcp_svr::async_write(boost::asio::const_buffer buff) {
std::lock_guard lock(m_sessions_mutex);
for (auto const& client : m_sessions) {
client->async_write(buff);
}
}
void tcp_svr::run() { m_io_service.run(); }
void tcp_svr::stop() {
if (m_stopped)
return;
m_stopped = true;
auto thread = std::thread{[this]() { m_acceptor.cancel(); }};
thread.join();
{
std::lock_guard lock(m_sessions_mutex);
for (auto const& client : m_sessions) {
client->close();
}
m_sessions.clear();
}
m_io_service.stop();
}
size_t tcp_svr::get_session_count() {
std::lock_guard lock(m_sessions_mutex);
return m_sessions.size();
}
void tcp_svr::do_accept() {
m_acceptor.async_accept(boost::asio::make_strand(m_io_service),
[this](boost::system::error_code err, boost_tcp::socket socket) {
if (m_stopped) {
return;
}
if (!err) {
std::lock_guard lock(m_sessions_mutex);
m_sessions.push_back(std::make_shared<tcp_session>(std::move(socket)));
}
do_accept();
});
}
// tcp_session.cpp
tcp_session::tcp_session(boost_tcp::socket socket)
: m_socket(std::move(socket))
, m_strand(socket.get_executor()) {}
tcp_session::~tcp_session() { close(); }
void tcp_session::async_write(boost::asio::const_buffer buff) {
boost::asio::post(m_strand, [this, buff = buff]() {
std::lock_guard lock(m_socket_mutex);
boost::asio::async_write(m_socket, buff,
[this, self = shared_from_this()](const boost::system::error_code& ec,
std::size_t bytes_transferred) {
handle_async_write(ec, bytes_transferred);
});
});
}
void tcp_session::async_write(char const* data, size_t size) { async_write(boost::asio::buffer(data, size)); }
void tcp_session::close() {
std::lock_guard lock(m_socket_mutex);
if (m_socket.is_open()) {
m_socket.close();
}
}
void tcp_session::handle_async_write(boost::system::error_code const& err,
[[maybe_unused]] size_t bytes_transferred) {
if (err) {
// log
}
}
class TcpFixture : public testing::Test {
protected:
boost::asio::io_service m_client_io_service{};
std::unique_ptr<tcp_svr> m_server;
std::thread m_client_io_service_thread;
std::thread m_server_thread;
void SetUp() override {
int16_t port = 1234;
m_server = std::make_unique<tcp_svr>(port);
m_client_io_service_thread = std::thread([&]() { m_client_io_service.run(); });
m_server_thread = std::thread([&]() { m_server->run(); });
}
void TearDown() override {
m_client_io_service.stop();
m_client_io_service_thread.join();
m_server->stop();
m_server_thread.join();
}
};
TEST_F(TcpFixture, NoInterleavingAsyncWrite) {
tcp::socket client_socket(m_client_io_service);
client_socket.connect(m_server->get_local_endpoint());
static constexpr int kMessageCount = 5;
static constexpr size_t kDataSize = 65536;
std::array<std::array<int, kDataSize>, kMessageCount> data{};
for (int i = 0; i < kMessageCount; i++) {
std::fill(data.at(i).begin(), data.at(i).end(), i + 1);
}
while (m_server->get_session_count() != 1) {
}
for (auto const& arr : data) {
m_server->async_write(boost::asio::buffer(arr));
}
for (int i = 0; i < kMessageCount; i++) {
std::array<int, kDataSize> read_buffer{};
boost::asio::read(client_socket, boost::asio::buffer(read_buffer));
std::map<int, int> value_count{};
for (int val : read_buffer) {
value_count[val]++;
}
EXPECT_EQ(value_count.size(), 1);
}
}
打印例如
Running main() from ./googletest/src/gtest_main.cc
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from TcpFixture
[ RUN ] TcpFixture.NoInterleavingAsyncWrite
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
value_count.size()
Which is: 4
1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
value_count.size()
Which is: 4
1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
value_count.size()
Which is: 4
1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
value_count.size()
Which is: 4
1
/home/sehe/Projects/stackoverflow/test.cpp:190: Failure
Expected equality of these values:
value_count.size()
Which is: 4
1
[ FAILED ] TcpFixture.NoInterleavingAsyncWrite (102 ms)
[----------] 1 test from TcpFixture (102 ms total)
[----------] Global test environment tear-down
[==========] 1 test from 1 test suite ran. (102 ms total)
[ PASSED ] 0 tests.
[ FAILED ] 1 test, listed below:
[ FAILED ] TcpFixture.NoInterleavingAsyncWrite
1 FAILED TEST
您对股线感到困惑。
您不需要互斥体和链。当您将链执行器与接受套接字关联时,您不需要另一个链。只需使用
m_socket.get_executor()
。
此外,链确实会阻止(中间)处理程序的并发执行。它们不会阻止重叠异步操作的并发启动。只是它们不会同时启动。 ́\(ツ)/́
此外,您的孔操作是单线程的,因此股线相当多余。
如果要避免交错异步组合写入操作,则必须序列化操作,例如通过排队。
此外,还存在会话生命周期问题(会话永远不会消失,因为您的 svr 保留共享所有权)。更严重的是,异步链发布操作无法捕获
shared_from_this
。
您也没有在客户端处理任何错误(包括 EOF)。
我冒昧地解决了上述所有问题和一些不相关的改进,这是完全按预期工作的测试用例:
住在科里鲁
#include <boost/asio.hpp>
#include <deque>
#include <list>
namespace Tcp {
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
using boost::system::error_code;
struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket socket) : m_socket(std::move(socket)) {}
void start() {
// keep a running read so we can detect errors and keep the session alive without the server
// owning the lifetime
post(m_socket.get_executor(), std::bind(&Session::do_read_loop, shared_from_this()));
}
void send(asio::const_buffer buff) {
post(m_socket.get_executor(), [this, self = shared_from_this(), buff]() {
outbox_.push_back(std::move(buff));
if (outbox_.size() == 1)
do_write_loop();
});
}
void close() {
post(m_socket.get_executor(), std::bind(&Session::do_close, shared_from_this()));
}
private:
tcp::socket m_socket;
std::array<char, 16> incoming_;
std::deque<asio::const_buffer> outbox_;
void do_read_loop() { // on strand
m_socket.async_read_some(asio::buffer(incoming_),
[this, self = shared_from_this()](error_code ec, size_t) {
if (!ec)
do_read_loop();
});
}
void do_write_loop() { // on strand
if (outbox_.empty())
return;
async_write(m_socket, outbox_.front(), //
[this, self = shared_from_this()](error_code ec, [[maybe_unused]] size_t n) {
if (!ec) {
outbox_.pop_front();
do_write_loop();
}
});
}
void do_close() { // on strand
if (m_socket.is_open())
m_socket.close();
}
};
class server {
public:
server(asio::any_io_executor ex, uint16_t port) //
: ex_(ex)
, acc_(make_strand(ex), {{}, port}) {
do_accept();
}
~server() { stop(); }
void send(asio::const_buffer buff) {
for (auto& handle : sessions_) {
if (auto sess = handle.lock())
sess->send(buff);
}
}
void stop() {
post(acc_.get_executor(), [this] {
for (auto& handle : sessions_) {
if (auto sess = handle.lock())
sess->close();
}
sessions_.clear();
acc_.cancel();
});
}
size_t get_session_count() {
return asio::post( //
acc_.get_executor(), //
asio::use_future([this] { return sessions_.size(); }))
.get();
}
tcp::endpoint local_endpoint() { return acc_.local_endpoint(); }
private:
using session_ptr = std::shared_ptr<Session>;
using handle = std::weak_ptr<Session>;
void do_accept() {
acc_.async_accept(asio::make_strand(ex_), [this](error_code err, tcp::socket socket) {
if (!err) {
do_accept();
auto sess = std::make_shared<Session>(std::move(socket));
sessions_.push_back(sess);
sess->start();
}
});
}
asio::any_io_executor ex_;
tcp::acceptor acc_;
std::list<handle> sessions_{};
};
} // namespace Tcp
#include <gtest/gtest.h>
class TcpFixture : public testing::Test {
protected:
boost::asio::thread_pool ioc_{1};
uint16_t port_ = 1234;
Tcp::server svr_{ioc_.get_executor(), port_};
void SetUp() override {}
void TearDown() override {
svr_.stop();
ioc_.join(); // optionally wait
}
};
TEST_F(TcpFixture, NoInterleavingAsyncWrite) {
boost::asio::ip::tcp::socket client_socket(ioc_);
client_socket.connect(svr_.local_endpoint());
constexpr size_t LEN = 65536;
using Message = std::array<int, LEN>;
std::vector<Message> data(5);
for (int i = 0; auto& msg : data)
std::fill(msg.begin(), msg.end(), ++i);
for (auto const& msg : data)
svr_.send(boost::asio::buffer(msg));
size_t total_received = 0;
for ([[maybe_unused]] auto&& _ : data) {
Message msg;
boost::system::error_code ec;
auto n = read(client_socket, boost::asio::buffer(msg), ec);
if (ec == boost::asio::error::eof) {
if (n == 0) // error condition without (partial) read success
break;
} else {
EXPECT_EQ(n, sizeof(Message));
EXPECT_FALSE(ec.failed());
}
if (n) {
++total_received;
EXPECT_EQ(n, sizeof(Message));
EXPECT_EQ(msg.size(), std::count(msg.begin(), msg.end(), msg.front()));
}
}
EXPECT_EQ(data.size(), total_received);
}
现场演示: