我目前正在尝试从虚幻引擎 5 与我们公司运行的现有软件进行通信。该软件接受 TCP 连接。
因为它似乎是 C++ 事实上的标准,所以我想使用 asio(独立,没有 boost)。根据一些在线教程和帖子,我成功创建了一个同步连接然后异步等待消息的工作客户端。只要我启动客户端时服务器应用程序已经在运行,这就可以工作。
//IOSocket.h (excluding function declarations)
static constexpr int BUFFER_SIZE = 4096;
static constexpr int MAX_CONNECTION_ATTEMPTS = 3;
std::array<char, BUFFER_SIZE> _buffer;
asio::io_context _context;
asio::ip::tcp::endpoint _endpoint;
asio::ip::tcp::socket _socket;
std::queue<std::string> _outMessageQueue;
//IOSocket.cpp
IOSocket::IOSocket(const std::string& hostname, const uint16_t& port) : _buffer(), _socket(_context) {
asio::ip::address ip;
if (hostname == "localhost" || hostname == "loopback") {
ip = asio::ip::address_v4::loopback();
} else {
ip = asio::ip::address::from_string(hostname);
}
_endpoint = asio::ip::tcp::endpoint(ip, port);
asio::io_context::work idle(_context);
_contextThread = std::thread([this]() {
_context.run();
});
Connect();
}
IOSocket::~IOSocket() {
Disconnect(false);
_context.stop();
if (_contextThread.joinable()) {
_contextThread.join();
}
}
void IOSocket::Connect() {
Disconnect();
//open TCP socket
asio::error_code error;
_socket.connect(_endpoint, error);
if (error) {
Disconnect();
return;
}
for (; !_outMessageQueue.empty(); _outMessageQueue.pop()) {
SendMessage(_outMessageQueue.front());
}
//start receive
ReceiveData();
}
void IOSocket::Disconnect() {
if (_socket.is_open()) {
asio::error_code error;
_socket.shutdown(asio::socket_base::shutdown_both, error);
_socket.close(error);
}
}
void IOSocket::ReceiveData() {
_socket.async_read_some(asio::buffer(_buffer), [&](asio::error_code error, std::size_t receviedBytes) {
if (!error) {
const std::vector<std::string> msgs = Utilities::String::Split(_buffer.data(), '*');
for (const std::string& msg : msgs) {
//Message gets handled by the owner of the IOSocket
}
ReceiveData();
}
});
}
void IOSocket::SendMessage(const std::string& message) {
if (_socket.is_open()) {
write(_socket, asio::buffer(message));
} else {
_outMessageQueue.push(message);
}
}
但是,我希望客户端在开始时尝试连接一段时间(间隔十秒尝试 3 次),当客户端已经建立连接并且服务器消失时,我希望客户端无限期地尝试重新连接直到服务器再次恢复。因此,我修改了 Connect 和 Disconnect 函数以使用异步辅助函数重新连接:
//IOSocket.h additions
std::thread _contextThread;
std::future<void> _waitForReconnect;
//IOSocket.cpp
void IOSocket::Connect(int remainingAttempts) {
Disconnect(false);
//open TCP socket
asio::error_code error;
_socket.connect(_endpoint, error);
if (error) {
remainingAttempts--;
if (remainingAttempts >= 0) {
int currentAttempt = MAX_CONNECTION_ATTEMPTS - remainingAttempts;
Debug::LogWarning(std::format("Connection attempt {0}/{1} failed: {2}", currentAttempt, MAX_CONNECTION_ATTEMPTS, error.message()));
Disconnect(remainingAttempts > 0, remainingAttempts);
} else {
Debug::LogWarning(std::format("Connection attempt failed: {0}", error.message()));
Disconnect(true);
}
return;
}
Debug::Log(std::format("Connected to Solid at {0}:{1}", _endpoint.address().to_string(), _endpoint.port()));
for (; !_outMessageQueue.empty(); _outMessageQueue.pop()) {
SendMessage(_outMessageQueue.front());
}
//start receive
ReceiveData();
}
void IOSocket::Disconnect(bool tryReconnect, int remainingAttempts) {
if (_socket.is_open()) {
asio::error_code error;
_socket.shutdown(asio::socket_base::shutdown_both, error);
_socket.close(error);
}
if (tryReconnect) {
_waitForReconnect = Utilities::Delay(10000, [this](int n) { Connect(n); }, remainingAttempts);
}
}
//Utilities.h
template<typename Fn, typename... Args>
static auto Delay(uint32_t delayMilliseconds, Fn&& fn, Args&&... args) {
return std::async(std::launch::async, [=](){
if (delayMilliseconds > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(delayMilliseconds));
}
fn(args...);
});
}
这几乎有效。我已建立连接并且可以发送消息,但我没有收到任何消息。如果我在任何延迟函数中调试并设置断点,io_context 将“停止”并且“outstanding_work”为 0。即使我重新启动它并给它新的“假”工作要做,我仍然无法让它处理传入的工作消息。 如果我同步进行初始连接尝试(并且服务器正在运行),这仍然会产生有效连接,但重新连接不会,并且如果我异步进行初始尝试,它会显示与重新连接相同的行为。通过我的 Delay 函数、显式 std::thread 或手动 std::async 调用来调用它没有区别。
我对asio很陌生,我已经有近十年没有写过C++代码了(我通常使用C#),所以我确信我做错了很多,但我就是不明白它是什么是。
从您发布的代码来看,您似乎使事情变得过于复杂了。这是一个简单的客户端,每 10 秒尝试连接到服务器 3 次,一旦失去连接,它将无限期地重新尝试。请注意,这里不涉及线程——只有异步操作:
#include <asio.hpp>
#include <chrono>
#include <iostream>
using asio::ip::tcp;
using namespace std::chrono_literals;
struct client
{
explicit client(asio::io_context& ctx) :
socket{ctx}, endpoint{asio::ip::address_v4::loopback(), 8080}, timer{ctx}
{ connect_in(0s); }
private:
tcp::socket socket;
tcp::endpoint endpoint;
asio::steady_timer timer;
std::string data;
int ttl = 3;
void connect_in(std::chrono::seconds s)
{
if (ttl)
{
timer.expires_after(s);
timer.async_wait([&](asio::error_code ec)
{
std::cout << "connect... " << std::flush;
if (!ec) socket.connect(endpoint, ec);
if (!ec)
{
std::cout << "OK" << std::endl;
ttl = -1;
recv_data();
}
else
{
std::cout << "FAILED" << std::endl;
if (ttl > 0) --ttl; // maybe negative
connect_in(10s);
}
});
}
else std::cout << "ABANDON" << std::endl;
}
void recv_data()
{
asio::async_read_until(socket, asio::dynamic_buffer(data), '*', [&](asio::error_code ec, std::size_t n)
{
if (!ec)
{
auto line = data.substr(0, n);
line.pop_back();
data.erase(0, n);
std::cout << line << std::endl;
recv_data();
}
else
{
std::cout << "disconnect" << std::endl;
socket.close();
connect_in(10s);
}
});
}
};
int main()
{
asio::io_context ctx;
client client{ctx};
ctx.run();
}
由于我无法访问 UE 服务器,因此这里有一个简单的服务器,用于侦听连接并每隔几秒发送“ping”消息:
#include <asio.hpp>
#include <chrono>
#include <iostream>
using asio::ip::tcp;
using namespace std::chrono_literals;
struct session
{
explicit session(tcp::socket s) :
socket{std::move(s)}, timer{socket.get_executor()}
{
std::cout << this << ": create" << std::endl;
send();
}
~session() { std::cout << this << ": delete" << std::endl; }
private:
tcp::socket socket;
asio::steady_timer timer;
void send()
{
timer.expires_after(2s);
timer.async_wait([&](asio::error_code ec)
{
if (!ec)
{
std::cout << this << ": ping" << std::endl;
socket.send(asio::buffer("ping*"), { }, ec);
if (!ec)
{
send();
return;
}
}
delete this;
});
}
};
struct server
{
server(asio::io_context& ctx) :
acceptor{ctx, tcp::endpoint{tcp::v4(), 8080}},
socket{ctx}
{ accept(); }
private:
tcp::acceptor acceptor;
tcp::socket socket;
void accept()
{
acceptor.async_accept(socket, [&](asio::error_code ec)
{
if (!ec) new session{std::move(socket)};
accept();
});
}
};
int main()
{
asio::io_context ctx;
server server{ctx};
ctx.run();
};