asio TCP 客户端在异步时中断

问题描述 投票:0回答:1

我目前正在尝试从虚幻引擎 5 与我们公司运行的现有软件进行通信。该软件接受 TCP 连接。

因为它似乎是 C++ 事实上的标准,所以我想使用 asio(独立,没有 boost)。根据一些在线教程和帖子,我成功创建了一个同步连接然后异步等待消息的工作客户端。只要我启动客户端时服务器应用程序已经在运行,这就可以工作。

//IOSocket.h (excluding function declarations)
static constexpr int BUFFER_SIZE = 4096;
static constexpr int MAX_CONNECTION_ATTEMPTS = 3;

std::array<char, BUFFER_SIZE> _buffer;
asio::io_context _context;
asio::ip::tcp::endpoint _endpoint;
asio::ip::tcp::socket _socket;
std::queue<std::string> _outMessageQueue;

//IOSocket.cpp
IOSocket::IOSocket(const std::string& hostname, const uint16_t& port) : _buffer(), _socket(_context) {

    asio::ip::address ip;
    if (hostname == "localhost" || hostname == "loopback") {
        ip = asio::ip::address_v4::loopback();
    } else {
        ip = asio::ip::address::from_string(hostname);
    }

    _endpoint = asio::ip::tcp::endpoint(ip, port);

    asio::io_context::work idle(_context);
    _contextThread = std::thread([this]() {
        _context.run();
    });

    Connect();
}

IOSocket::~IOSocket() {
    Disconnect(false);

    _context.stop();
    if (_contextThread.joinable()) {
        _contextThread.join();
    }
}

void IOSocket::Connect() {
    Disconnect();

    //open TCP socket
    asio::error_code error;
    _socket.connect(_endpoint, error);

    if (error) {
        Disconnect();
        return;
    }

    for (; !_outMessageQueue.empty(); _outMessageQueue.pop()) {
        SendMessage(_outMessageQueue.front());
    }

    //start receive
    ReceiveData();
}

void IOSocket::Disconnect() {
    if (_socket.is_open()) {
        asio::error_code error;
        _socket.shutdown(asio::socket_base::shutdown_both, error);
        _socket.close(error);
    }
}

void IOSocket::ReceiveData() {
    _socket.async_read_some(asio::buffer(_buffer), [&](asio::error_code error, std::size_t receviedBytes) {
        if (!error) {
            const std::vector<std::string> msgs = Utilities::String::Split(_buffer.data(), '*');

            for (const std::string& msg : msgs) {
                //Message gets handled by the owner of the IOSocket
            }
            ReceiveData();
        }
    });
}

void IOSocket::SendMessage(const std::string& message) {
    if (_socket.is_open()) {
        write(_socket, asio::buffer(message));
    } else {
        _outMessageQueue.push(message);
    }
}

但是,我希望客户端在开始时尝试连接一段时间(间隔十秒尝试 3 次),当客户端已经建立连接并且服务器消失时,我希望客户端无限期地尝试重新连接直到服务器再次恢复。因此,我修改了 Connect 和 Disconnect 函数以使用异步辅助函数重新连接:

//IOSocket.h additions
std::thread _contextThread;
std::future<void> _waitForReconnect;

//IOSocket.cpp
void IOSocket::Connect(int remainingAttempts) {
    Disconnect(false);

    //open TCP socket
    asio::error_code error;
    _socket.connect(_endpoint, error);

    if (error) {
        remainingAttempts--;
        if (remainingAttempts >= 0) {
            int currentAttempt = MAX_CONNECTION_ATTEMPTS - remainingAttempts;
            Debug::LogWarning(std::format("Connection attempt {0}/{1} failed: {2}", currentAttempt, MAX_CONNECTION_ATTEMPTS, error.message()));
            Disconnect(remainingAttempts > 0, remainingAttempts);
        } else {
            Debug::LogWarning(std::format("Connection attempt failed: {0}", error.message()));
            Disconnect(true);
        }

        return;
    }

    Debug::Log(std::format("Connected to Solid at {0}:{1}", _endpoint.address().to_string(), _endpoint.port()));

    for (; !_outMessageQueue.empty(); _outMessageQueue.pop()) {
        SendMessage(_outMessageQueue.front());
    }

    //start receive
    ReceiveData();
}

void IOSocket::Disconnect(bool tryReconnect, int remainingAttempts) {
    if (_socket.is_open()) {
        asio::error_code error;
        _socket.shutdown(asio::socket_base::shutdown_both, error);
        _socket.close(error);
    }

    if (tryReconnect) {
        _waitForReconnect = Utilities::Delay(10000, [this](int n) { Connect(n); }, remainingAttempts);
    }
}

//Utilities.h
template<typename Fn, typename... Args>
static auto Delay(uint32_t delayMilliseconds, Fn&& fn, Args&&... args) {
    return std::async(std::launch::async, [=](){
        if (delayMilliseconds > 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(delayMilliseconds));
        }
        fn(args...);
    });
}

几乎有效。我已建立连接并且可以发送消息,但我没有收到任何消息。如果我在任何延迟函数中调试并设置断点,io_context 将“停止”并且“outstanding_work”为 0。即使我重新启动它并给它新的“假”工作要做,我仍然无法让它处理传入的工作消息。 如果我同步进行初始连接尝试(并且服务器正在运行),这仍然会产生有效连接,但重新连接不会,并且如果我异步进行初始尝试,它会显示与重新连接相同的行为。通过我的 Delay 函数、显式 std::thread 或手动 std::async 调用来调用它没有区别。

我对asio很陌生,我已经有近十年没有写过C++代码了(我通常使用C#),所以我确信我做错了很多,但我就是不明白它是什么是。

c++ asynchronous unreal-engine5 asio
1个回答
0
投票

从您发布的代码来看,您似乎使事情变得过于复杂了。这是一个简单的客户端,每 10 秒尝试连接到服务器 3 次,一旦失去连接,它将无限期地重新尝试。请注意,这里不涉及线程——只有异步操作:

#include <asio.hpp>
#include <chrono>
#include <iostream>

using asio::ip::tcp;
using namespace std::chrono_literals;

struct client
{
    explicit client(asio::io_context& ctx) :
        socket{ctx}, endpoint{asio::ip::address_v4::loopback(), 8080}, timer{ctx}
    { connect_in(0s); }

private:
    tcp::socket socket;
    tcp::endpoint endpoint;
    asio::steady_timer timer;

    std::string data;

    int ttl = 3;
    void connect_in(std::chrono::seconds s)
    {
        if (ttl)
        {
            timer.expires_after(s);
            timer.async_wait([&](asio::error_code ec)
            {
                std::cout << "connect... " << std::flush;

                if (!ec) socket.connect(endpoint, ec);
                if (!ec)
                {
                    std::cout << "OK" << std::endl;
                    ttl = -1;
                    recv_data();
                }
                else
                {
                    std::cout << "FAILED" << std::endl;
                    if (ttl > 0) --ttl; // maybe negative
                    connect_in(10s);
                }
            });
        }
        else std::cout << "ABANDON" << std::endl;
    }

    void recv_data()
    {
        asio::async_read_until(socket, asio::dynamic_buffer(data), '*', [&](asio::error_code ec, std::size_t n)
        {
            if (!ec)
            {
                auto line = data.substr(0, n);
                line.pop_back();
                data.erase(0, n);

                std::cout << line << std::endl;
                recv_data();
            }
            else
            {
                std::cout << "disconnect" << std::endl;
                socket.close();
                connect_in(10s);
            }
        });
    }
};

int main()
{
    asio::io_context ctx;
    client client{ctx};
    ctx.run();
}

由于我无法访问 UE 服务器,因此这里有一个简单的服务器,用于侦听连接并每隔几秒发送“ping”消息:

#include <asio.hpp>
#include <chrono>
#include <iostream>

using asio::ip::tcp;
using namespace std::chrono_literals;

struct session
{
    explicit session(tcp::socket s) :
        socket{std::move(s)}, timer{socket.get_executor()}
    {
        std::cout << this << ": create" << std::endl;
        send();
    }

    ~session() { std::cout << this << ": delete" << std::endl; }

private:
    tcp::socket socket;
    asio::steady_timer timer;

    void send()
    {
        timer.expires_after(2s);
        timer.async_wait([&](asio::error_code ec)
        {
            if (!ec)
            {
                std::cout << this << ": ping" << std::endl;
                socket.send(asio::buffer("ping*"), { }, ec);
                if (!ec)
                {
                    send();
                    return;
                }
            }

            delete this;
        });
    }
};

struct server
{
    server(asio::io_context& ctx) :
        acceptor{ctx, tcp::endpoint{tcp::v4(), 8080}},
        socket{ctx}
    { accept(); }

private:
    tcp::acceptor acceptor;
    tcp::socket socket;

    void accept()
    {
        acceptor.async_accept(socket, [&](asio::error_code ec)
        {
            if (!ec) new session{std::move(socket)};
            accept();
        });
    }
};

int main()
{
    asio::io_context ctx;
    server server{ctx};
    ctx.run();
};

两者的链接:https://godbolt.org/z/q9TjnP6Ws

© www.soinside.com 2019 - 2024. All rights reserved.