ASIO 使用 asio::async_write、asio::async_read 和 asio::async_read_until 通过 TCP 不断向服务器发送请求并从服务器获取响应

问题描述 投票:0回答:1
我正在开发一个 C++ 客户端/服务器应用程序,具有独立的 asio,无需 boost。我希望客户端应用程序能够不断从服务器获取响应,并在客户端尝试这样做时向服务器发送请求。但是,我不知道应该将 asio::io_context::run 放在代码中的确切位置,以使其在应用程序正常工作时也能正常工作。在我的应用程序中,我还使用 Dear ImGui 库来处理 GUI 相关的内容,它在单独的线程中运行。 ASIO是在主线程中运行的。我注意到有些人在调用 asio::async_connect 后立即在另一个线程中运行 io_context ,这会调用第一个 async_read 操作来为 ASIO 做一些工作以避免过早停止。在我的代码中,我尝试了相同的技术,但它仍然过早退出。我将部分代码包含在我的主要函数和客户端类中。在这个程序中,我想首先使用指定的分隔符发送请求的大小,然后发送请求本身,然后服务器使用 asio::async_read_until 读取请求大小,直到指定的分隔符并使用给定的分隔符准备请求缓冲区读取请求本身的大小。然后服务器也以同样的方式发送响应大小和响应本身。

class TCPClient { public: TCPClient(std::string& IP, std::string& PORT); ~TCPClient(); void push_request(std::string& request); void connect(); private: void start_read(); void process_write(); asio::io_context context; asio::io_context::work work; std::shared_ptr<tcp::socket> socket; std::shared_ptr<asio::streambuf> buffer; std::queue<std::string> requests; const std::string delimeter = "\n"; std::thread io_thread; tcp::resolver resolver; tcp::resolver::results_type endpoints; }; int main() { OpenSSL_add_all_algorithms(); ERR_load_crypto_strings(); std::thread gui_thread; try { std::string IP = "IP"; std::string PORT = "PORT"; TCPClient client(IP, PORT); gui_thread = std::thread([&client] { if (!std::filesystem::exists("public.pem")) { std::string request = "key"; client.push_request(request); } std::unique_lock<std::mutex> lock(options.mutex); options.condition.wait(lock, [] { return std::filesystem::exists("public.pem"); }); login_register_window(client); if (options.is_logged && options.is_retrieved) { main_window(client); } else { std::cout << "Application should stop\n"; client.~TCPClient(); } } ); client.connect(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << '\n'; } gui_thread.join(); return 0; } TCPClient::TCPClient(std::string& IP, std::string& PORT) : socket(std::make_shared<tcp::socket>(context)), buffer(std::make_shared<asio::streambuf>()), resolver(context), endpoints(resolver.resolve(IP, PORT)), work(context) { } TCPClient::~TCPClient() { context.stop(); io_thread.join(); } void TCPClient::connect() { asio::async_connect(*socket, endpoints, [this](std::error_code ec, tcp::endpoint) { if (ec) { std::cerr << "Error connecting: " << ec.message() << '\n'; } }); start_read(); io_thread = std::thread([this]() { context.run(); }); } void TCPClient::push_request(std::string& request) { bool is_queue_empty = false; { std::unique_lock<std::mutex> lock(options.mutex); is_queue_empty = requests.empty(); requests.push(request); } options.condition.notify_one(); if (is_queue_empty) process_write(); } void TCPClient::process_write() { asio::post(context, [this]() { std::unique_lock<std::mutex> lock(options.mutex); if (!requests.empty()) { std::string request = requests.front(); requests.pop(); bool is_queue_empty = requests.empty(); lock.unlock(); std::string request_size = std::to_string(request.size()) + delimeter; asio::async_write(*socket, asio::buffer(request_size), [this, request = std::move(request), is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) { if (!ec) { std::cout << "Request size (" << bytes << " bytes) was sent.\n"; asio::async_write(*socket, asio::buffer(request), [this, is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) { if (!ec) { std::cout << "Request (" << bytes << " bytes) was sent.\n"; if (!is_queue_empty) process_write(); // Initiate the next write operation } else { std::cerr << "Error sending request: " << ec.message() << '\n'; } }); } else { std::cerr << "Error sending request size: " << ec.message() << '\n'; } }); } }); } void TCPClient::start_read() { asio::async_read_until(*socket, *buffer, delimeter, [this](const std::error_code& ec, std::size_t bytes) { if (!ec) { std::istream input_stream(buffer.get()); std::string response_size_str; std::getline(input_stream, response_size_str); buffer->consume(bytes); response_size_str = response_size_str.substr(0, response_size_str.find(delimeter)); int response_size = std::stoi(response_size_str); asio::async_read(*socket, asio::buffer(buffer->prepare(response_size), response_size), [this](const std::error_code& ec, std::size_t bytes) { if (!ec) { std::istream input_stream(buffer.get()); std::string response; std::getline(input_stream, response); buffer->consume(bytes); if (options.is_logged) { handle_response(response); } else { std::cout << "Response received: " << response << '\n'; handle_login(response); } start_read(); } else { std::cerr << "Error getting response: " << ec.message() << '\n'; } }); } else { std::cerr << "Error getting response size: " << ec.message() << '\n'; } }); }
我检查了 boost::asio 提供的示例,但它对我不起作用,我在网上搜索了一些论坛,但仍然没有解决我的问题

c++ sockets boost-asio asio
1个回答
0
投票
最大的问题是 TCPClient 在

try

 块的末尾被破坏。没有人会等待任何事情,析构函数
stop
就是 io 上下文。您可以使用调试器看到这一点,或者例如添加一些跟踪:

~TCPClient() { std::cout << "PROOF OF " << __FUNCTION__ << std::endl; context.stop(); io_thread.join(); }
评论备注

“从不”手动调用析构函数。

不必要地不要使用动态分配(即使使用共享或其他智能指针时)。

不要通过引用将局部变量传递给异步操作(例如

request_size

,顺便说一句,对于该变量来说这是一个糟糕的名称)。

当您在

istream

 上使用 
streambuf
 时,不得 
consume
 字节,因为流提取操作会这样做!

streambuf::prepare

 仅适用于需要单个固定缓冲区的异步操作(如 
tcp::socket::async_read_some
)。对于组合操作(采用动态缓冲区)只需传递 
streambuf

异步操作总是立即返回,因此根据定义,它们会在完成之前返回。因此,这是行不通的:

asio::async_connect(socket, endpoints, [this](std::error_code ec, tcp::endpoint) { if (ec) { std::cerr << "Error connecting: " << ec.message() <<std::endl; } }); start_read();
至少,

start_read()

 需要出现在 
async_connect
 的完成处理程序中。

async_connect(socket, endpoints, [this](error_code ec, tcp::endpoint) { std::cout << "Connection: " << ec.message() << std::endl; if (!ec) start_read(); });
我确实同意在发布第一个异步操作后启动 

io_context::run

 是很好的。但是,由于您有一个 
work
 保护,因此它是多余的,为了清楚起见,您可能应该在构造函数中启动线程。

但是,更喜欢现代的

work_guard

,您可以重置它,这样您就可以在析构函数中实际完成。当然,没关系,因为你强行
stop
上下文。

真正的问题

post

写入循环。但它可能会阻塞(由于锁)并且它只是启动异步操作?也许您打算使用 io 线程来确保对成员的序列化访问。既然如此,为什么还要有互斥锁呢?只需发布 
push_request
 本身即可:

void push_request(std::string& request) { post(socket.get_executor(), [this, request = std::move(request)]() mutable { requests.push(std::move(request)); if (requests.size() == 1) do_write_loop(); // renamed from "process_write" }); }
另外,不要移动缓冲区:

std::string request = requests.front(); requests.pop();
这再次导致缓冲区成为不适合 

async_write

 的局部变量。相反,将消息留在队列中(
std::deque
对于两端的插入/删除具有参考稳定性)。

合并写入:

requests.front().insert(0, std::to_string(request.size()) + delimiter);
现在整个写循环简化为

void do_write_loop() { // on the logical strand if (requests.empty()) return; requests.front().insert(0, std::to_string(requests.size()) + delimiter); async_write(socket, asio::buffer(requests.front()), [this](error_code ec, size_t bytes) { if (!ec) { std::cout << "Request (" << bytes << " bytes) was sent." << std::endl; requests.pop(); do_write_loop(); // Initiate the next write operation } else { std::cerr << "Error sending request: " << ec.message() << std::endl; } }); }
未解之谜

如上所述,

consume

 是多余的。然而,
istream
也是如此吗?!只是简单化,让流为你工作?

int response_size = 0; if ((std::istream(&buffer) >> response_size).ignore(9999, delimiter).bad()) { std::cerr << "Invalid response header" << std::endl; return; }
不要不必要地通过可变引用获取构造函数参数。

尚不清楚等待的条件是什么。显然,在发送密钥请求后,它等待......文件神奇地出现在文件系统上。这似乎是 IPC 机制的一个相当脆弱的选择,特别是据我所知,所有同步部分都在同一个进程中。现在让我们把魔法咒语放入

do_write_loop

:

{ // magic wake up spell std::lock_guard<std::mutex> lock(options.mutex); options.condition.notify_one(); }
已审核列表

住在科里鲁

#include <boost/asio.hpp> #include <filesystem> #include <iostream> #include <queue> namespace asio = boost::asio; using asio::ip::tcp; using boost::system::error_code; // std::error_code; // for standalone struct { std::mutex mutex; std::condition_variable condition; std::atomic_bool is_logged = false; std::atomic_bool is_retrieved = false; } options; struct TCPClient { TCPClient(std::string const& host, std::string const& service) { // endpoints = tcp::resolver{context}.resolve(host, service); } ~TCPClient() { context.stop(); io_thread.join(); } void push_request(std::string request) { post(socket.get_executor(), [this, request = std::move(request)]() mutable { requests.push(std::move(request)); if (requests.size() == 1) do_write_loop(); // renamed from "process_write" }); } void connect() { async_connect(socket, endpoints, [this](error_code ec, tcp::endpoint) { std::cout << "Connection: " << ec.message() << std::endl; if (!ec) start_read(); }); } private: // on the logical strand void do_write_loop() { { // magic wake up spell std::lock_guard<std::mutex> lock(options.mutex); options.condition.notify_one(); } if (requests.empty()) return; requests.front().insert(0, std::to_string(requests.size()) + delimiter); async_write(socket, asio::buffer(requests.front()), [this](error_code ec, size_t bytes) { if (!ec) { std::cout << "Request (" << bytes << " bytes) was sent." << std::endl; requests.pop(); do_write_loop(); // Initiate the next write operation } else { std::cerr << "Error sending request: " << ec.message() << std::endl; } }); } void handle_response(std::string) const {} void handle_login(std::string) const { options.is_logged = true; } void start_read() { async_read_until(socket, buffer, delimiter, [this](error_code ec, size_t /*bytes*/) { if (ec && ec != asio::error::eof) { std::cerr << "Error getting response size: " << ec.message() << std::endl; return; } int response_size = 0; if ((std::istream(&buffer) >> response_size).ignore(9999, delimiter).bad()) { std::cerr << "Invalid response header" << std::endl; return; } async_read(socket, buffer, asio::transfer_exactly(response_size), [this](error_code ec, size_t bytes) { std::string response(buffer_cast<char const*>(buffer.data()), bytes); buffer.consume(bytes); if (!ec) { if (options.is_logged) { handle_response(response); } else { std::cout << "Response received: " << response << std::endl; handle_login(response); } start_read(); } else { std::cerr << "Error getting response: " << ec.message() << std::endl; } }); }); } asio::io_context context; tcp::socket socket{context}; tcp::resolver::results_type endpoints; asio::streambuf buffer; std::queue<std::string> requests; static constexpr char delimiter = '\n'; asio::io_context::work work{context}; std::thread io_thread{[this] { context.run(); }}; }; void main_window(TCPClient&) {} void login_register_window(TCPClient&) {} #include <openssl/evp.h> #include <openssl/err.h> int main() try { OpenSSL_add_all_algorithms(); ERR_load_crypto_strings(); TCPClient client("localhost", "8989"); std::thread gui_thread([&client] { if (!std::filesystem::exists("public.pem")) client.push_request("key"); std::unique_lock<std::mutex> lock(options.mutex); options.condition.wait(lock, [] { return std::filesystem::exists("public.pem"); }); login_register_window(client); if (options.is_logged && options.is_retrieved) { main_window(client); } else { std::cout << "Application should stop" << std::endl; } }); client.connect(); gui_thread.join(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; }
现场演示

© www.soinside.com 2019 - 2024. All rights reserved.