class TCPClient
{
public:
TCPClient(std::string& IP, std::string& PORT);
~TCPClient();
void push_request(std::string& request);
void connect();
private:
void start_read();
void process_write();
asio::io_context context;
asio::io_context::work work;
std::shared_ptr<tcp::socket> socket;
std::shared_ptr<asio::streambuf> buffer;
std::queue<std::string> requests;
const std::string delimeter = "\n";
std::thread io_thread;
tcp::resolver resolver;
tcp::resolver::results_type endpoints;
};
int main()
{
OpenSSL_add_all_algorithms();
ERR_load_crypto_strings();
std::thread gui_thread;
try
{
std::string IP = "IP";
std::string PORT = "PORT";
TCPClient client(IP, PORT);
gui_thread = std::thread([&client] {
if (!std::filesystem::exists("public.pem"))
{
std::string request = "key";
client.push_request(request);
}
std::unique_lock<std::mutex> lock(options.mutex);
options.condition.wait(lock, [] { return std::filesystem::exists("public.pem"); });
login_register_window(client);
if (options.is_logged && options.is_retrieved)
{
main_window(client);
}
else
{
std::cout << "Application should stop\n";
client.~TCPClient();
}
}
);
client.connect();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << '\n';
}
gui_thread.join();
return 0;
}
TCPClient::TCPClient(std::string& IP, std::string& PORT)
: socket(std::make_shared<tcp::socket>(context)),
buffer(std::make_shared<asio::streambuf>()), resolver(context),
endpoints(resolver.resolve(IP, PORT)), work(context)
{ }
TCPClient::~TCPClient()
{
context.stop();
io_thread.join();
}
void TCPClient::connect()
{
asio::async_connect(*socket, endpoints, [this](std::error_code ec, tcp::endpoint) {
if (ec)
{
std::cerr << "Error connecting: " << ec.message() << '\n';
}
});
start_read();
io_thread = std::thread([this]() { context.run(); });
}
void TCPClient::push_request(std::string& request)
{
bool is_queue_empty = false;
{
std::unique_lock<std::mutex> lock(options.mutex);
is_queue_empty = requests.empty();
requests.push(request);
}
options.condition.notify_one();
if (is_queue_empty)
process_write();
}
void TCPClient::process_write()
{
asio::post(context, [this]() {
std::unique_lock<std::mutex> lock(options.mutex);
if (!requests.empty()) {
std::string request = requests.front();
requests.pop();
bool is_queue_empty = requests.empty();
lock.unlock();
std::string request_size = std::to_string(request.size()) + delimeter;
asio::async_write(*socket, asio::buffer(request_size),
[this, request = std::move(request), is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) {
if (!ec) {
std::cout << "Request size (" << bytes << " bytes) was sent.\n";
asio::async_write(*socket, asio::buffer(request),
[this, is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) {
if (!ec) {
std::cout << "Request (" << bytes << " bytes) was sent.\n";
if (!is_queue_empty)
process_write(); // Initiate the next write operation
}
else {
std::cerr << "Error sending request: " << ec.message() << '\n';
}
});
}
else {
std::cerr << "Error sending request size: " << ec.message() << '\n';
}
});
}
});
}
void TCPClient::start_read()
{
asio::async_read_until(*socket, *buffer, delimeter,
[this](const std::error_code& ec, std::size_t bytes) {
if (!ec) {
std::istream input_stream(buffer.get());
std::string response_size_str;
std::getline(input_stream, response_size_str);
buffer->consume(bytes);
response_size_str = response_size_str.substr(0, response_size_str.find(delimeter));
int response_size = std::stoi(response_size_str);
asio::async_read(*socket, asio::buffer(buffer->prepare(response_size), response_size),
[this](const std::error_code& ec, std::size_t bytes) {
if (!ec)
{
std::istream input_stream(buffer.get());
std::string response;
std::getline(input_stream, response);
buffer->consume(bytes);
if (options.is_logged)
{
handle_response(response);
}
else
{
std::cout << "Response received: " << response << '\n';
handle_login(response);
}
start_read();
}
else
{
std::cerr << "Error getting response: " << ec.message() << '\n';
}
});
}
else {
std::cerr << "Error getting response size: " << ec.message() << '\n';
}
});
}
我检查了 boost::asio 提供的示例,但它对我不起作用,我在网上搜索了一些论坛,但仍然没有解决我的问题
try
块的末尾被破坏。没有人会等待任何事情,析构函数
stop
就是 io 上下文。您可以使用调试器看到这一点,或者例如添加一些跟踪:
~TCPClient() {
std::cout << "PROOF OF " << __FUNCTION__ << std::endl;
context.stop();
io_thread.join();
}
评论备注不必要地不要使用动态分配(即使使用共享或其他智能指针时)。
不要通过引用将局部变量传递给异步操作(例如
request_size
,顺便说一句,对于该变量来说这是一个糟糕的名称)。当您在
istream
上使用
streambuf
时,不得
consume
字节,因为流提取操作会这样做!
streambuf::prepare
仅适用于需要单个固定缓冲区的异步操作(如
tcp::socket::async_read_some
)。对于组合操作(采用动态缓冲区)只需传递
streambuf
。异步操作总是立即返回,因此根据定义,它们会在完成之前返回。因此,这是行不通的:
asio::async_connect(socket, endpoints, [this](std::error_code ec, tcp::endpoint) {
if (ec) {
std::cerr << "Error connecting: " << ec.message() <<std::endl;
}
});
start_read();
至少,start_read()
需要出现在
async_connect
的完成处理程序中。
async_connect(socket, endpoints, [this](error_code ec, tcp::endpoint) {
std::cout << "Connection: " << ec.message() << std::endl;
if (!ec)
start_read();
});
我确实同意在发布第一个异步操作后启动 io_context::run
是很好的。但是,由于您有一个
work
保护,因此它是多余的,为了清楚起见,您可能应该在构造函数中启动线程。
但是,更喜欢现代的真正的问题
work_guard
,您可以重置它,这样您就可以在析构函数中实际完成。当然,没关系,因为你强行stop
上下文。
post
写入循环。但它可能会阻塞(由于锁)并且它只是启动异步操作?也许您打算使用 io 线程来确保对成员的序列化访问。既然如此,为什么还要有互斥锁呢?只需发布
push_request
本身即可:
void push_request(std::string& request) {
post(socket.get_executor(), [this, request = std::move(request)]() mutable {
requests.push(std::move(request));
if (requests.size() == 1)
do_write_loop(); // renamed from "process_write"
});
}
另外,不要移动缓冲区:
std::string request = requests.front();
requests.pop();
这再次导致缓冲区成为不适合 async_write
的局部变量。相反,将消息留在队列中(
std::deque
对于两端的插入/删除具有参考稳定性)。合并写入:
requests.front().insert(0, std::to_string(request.size()) + delimiter);
现在整个写循环简化为
void do_write_loop() { // on the logical strand
if (requests.empty())
return;
requests.front().insert(0, std::to_string(requests.size()) + delimiter);
async_write(socket, asio::buffer(requests.front()), [this](error_code ec, size_t bytes) {
if (!ec) {
std::cout << "Request (" << bytes << " bytes) was sent." << std::endl;
requests.pop();
do_write_loop(); // Initiate the next write operation
} else {
std::cerr << "Error sending request: " << ec.message() << std::endl;
}
});
}
未解之谜consume
是多余的。然而,
istream
也是如此吗?!只是简单化,让流为你工作?
int response_size = 0;
if ((std::istream(&buffer) >> response_size).ignore(9999, delimiter).bad()) {
std::cerr << "Invalid response header" << std::endl;
return;
}
不要不必要地通过可变引用获取构造函数参数。尚不清楚等待的条件是什么。显然,在发送密钥请求后,它等待......文件神奇地出现在文件系统上。这似乎是 IPC 机制的一个相当脆弱的选择,特别是据我所知,所有同步部分都在同一个进程中。现在让我们把魔法咒语放入
do_write_loop
:
{ // magic wake up spell
std::lock_guard<std::mutex> lock(options.mutex);
options.condition.notify_one();
}
已审核列表住在科里鲁
#include <boost/asio.hpp>
#include <filesystem>
#include <iostream>
#include <queue>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code; // std::error_code; // for standalone
struct {
std::mutex mutex;
std::condition_variable condition;
std::atomic_bool is_logged = false;
std::atomic_bool is_retrieved = false;
} options;
struct TCPClient {
TCPClient(std::string const& host, std::string const& service) { //
endpoints = tcp::resolver{context}.resolve(host, service);
}
~TCPClient() {
context.stop();
io_thread.join();
}
void push_request(std::string request) {
post(socket.get_executor(), [this, request = std::move(request)]() mutable {
requests.push(std::move(request));
if (requests.size() == 1)
do_write_loop(); // renamed from "process_write"
});
}
void connect() {
async_connect(socket, endpoints, [this](error_code ec, tcp::endpoint) {
std::cout << "Connection: " << ec.message() << std::endl;
if (!ec)
start_read();
});
}
private:
// on the logical strand
void do_write_loop() {
{ // magic wake up spell
std::lock_guard<std::mutex> lock(options.mutex);
options.condition.notify_one();
}
if (requests.empty())
return;
requests.front().insert(0, std::to_string(requests.size()) + delimiter);
async_write(socket, asio::buffer(requests.front()), [this](error_code ec, size_t bytes) {
if (!ec) {
std::cout << "Request (" << bytes << " bytes) was sent." << std::endl;
requests.pop();
do_write_loop(); // Initiate the next write operation
} else {
std::cerr << "Error sending request: " << ec.message() << std::endl;
}
});
}
void handle_response(std::string) const {}
void handle_login(std::string) const { options.is_logged = true; }
void start_read() {
async_read_until(socket, buffer, delimiter, [this](error_code ec, size_t /*bytes*/) {
if (ec && ec != asio::error::eof) {
std::cerr << "Error getting response size: " << ec.message() << std::endl;
return;
}
int response_size = 0;
if ((std::istream(&buffer) >> response_size).ignore(9999, delimiter).bad()) {
std::cerr << "Invalid response header" << std::endl;
return;
}
async_read(socket, buffer, asio::transfer_exactly(response_size),
[this](error_code ec, size_t bytes) {
std::string response(buffer_cast<char const*>(buffer.data()), bytes);
buffer.consume(bytes);
if (!ec) {
if (options.is_logged) {
handle_response(response);
} else {
std::cout << "Response received: " << response << std::endl;
handle_login(response);
}
start_read();
} else {
std::cerr << "Error getting response: " << ec.message() << std::endl;
}
});
});
}
asio::io_context context;
tcp::socket socket{context};
tcp::resolver::results_type endpoints;
asio::streambuf buffer;
std::queue<std::string> requests;
static constexpr char delimiter = '\n';
asio::io_context::work work{context};
std::thread io_thread{[this] { context.run(); }};
};
void main_window(TCPClient&) {}
void login_register_window(TCPClient&) {}
#include <openssl/evp.h>
#include <openssl/err.h>
int main() try {
OpenSSL_add_all_algorithms();
ERR_load_crypto_strings();
TCPClient client("localhost", "8989");
std::thread gui_thread([&client] {
if (!std::filesystem::exists("public.pem"))
client.push_request("key");
std::unique_lock<std::mutex> lock(options.mutex);
options.condition.wait(lock, [] { return std::filesystem::exists("public.pem"); });
login_register_window(client);
if (options.is_logged && options.is_retrieved) {
main_window(client);
} else {
std::cout << "Application should stop" << std::endl;
}
});
client.connect();
gui_thread.join();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
现场演示