async_write 仅在服务器关闭后发送

问题描述 投票:0回答:1

我尝试通过 async_write 发送消息,但它们仅在我关闭服务器(ctrl-c)后发送

例如: 作为客户端,我发送“test”和“test2”,只有在关闭服务器后客户端才会收到“testtest2”

我正在进行一个聊天,该聊天接受来自用户的消息(成功)并且必须将其广播给每个人

消息发送代码

void Server::writeHandler(int id, boost::system::error_code error){
    if (!error){
        std::cout << "[DEBUG] message broadcasted ";
    } else {
        close_connection(id);
    }
}
void Server::broadcast(std::string msg, boost::system::error_code error){
    for (auto& user : m_users){ // for every user in unordered map of users
        asio::async_write(user.second->socket, asio::buffer(msg, msg.size()),
            std::bind(&Server::writeHandler, this, user.first, std::placeholders::_1));
    }
}

onMessage 中的广播调用

void Server::onMessage(int id, boost::system::error_code error){
    if (!error){
        broadcast(m_read_msg, error); // char m_read_msg[PACK_SIZE] // PACK_SIZE = 512
        asio::async_read(m_users[id].get()->socket, asio::buffer(m_read_msg, PACK_SIZE), // PACK_SIZE = 512
            std::bind(&Server::onMessage, this, id, std::placeholders::_1));
    } else {
        close_connection(id);
    }
}

服务器运行功能:

void Server::run(int port){
    asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
    std::cout << "[DEBUG] binded on " << port << std::endl;

    // acceptor initialization
    m_acceptor = std::make_shared<asio::ip::tcp::acceptor>(m_io, endpoint);
    
    // start to listen for connections
    listen();

    // run the io_service in thread
    io_thread = std::thread( [&]{ m_io.run(); } ); // m_io = io_serivce

    while (true){
    }
}

void Server::listen(){
    std::shared_ptr<User> pUser(new User(m_io));
    m_acceptor->async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
}
c++ networking c++17 boost-asio asio
1个回答
0
投票

无限循环是 C++ 中的UB

您的

std::string msg
是本地(参数),将其传递给异步操作 (
async_write
) 也是 UB。

启动一个线程,紧接着无限循环是没有用的,除非目的是浪费大量CPU资源。所以,将其替换为

 m_io.run();

接下来,代码远非独立,并且缺乏各种错误处理方式。这是我想象的最小完成度,修复了已经提到的 UB:

#include <boost/asio.hpp>
#include <iostream>
#include <map>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;

struct User {
    User(asio::io_service& io) : socket(io) {}

    tcp::socket socket;
};

struct Server {
    void run(uint16_t port) {
        m_acceptor = {m_io, {{}, port}};
        std::cout << "[DEBUG] bound on " << port << std::endl;

        listen();

        m_io.run();
    }

    void close_connection(int) {}
    std::map<int, std::shared_ptr<User>> m_users;

    void writeHandler(int id, error_code error, std::shared_ptr<void>) {
        if (!error) {
            std::cout << "[DEBUG] message broadcasted ";
        } else {
            close_connection(id);
        }
    }

    void broadcast(std::string msg, boost::system::error_code /*error*/) {
        auto shared_msg = std::make_shared<std::string>(std::move(msg));
        for (auto& [id, usr] : m_users) { // for every user in unordered map of users
            asio::async_write(usr->socket, asio::buffer(*shared_msg),
                              bind(&Server::writeHandler, this, id, _1, shared_msg));
        }
    }

    void listen() {
        std::shared_ptr<User> pUser(new User(m_io));
        m_acceptor.async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
    }

    void onAccept(std::shared_ptr<User> /*user*/) { listen(); }

    asio::io_service m_io;
    tcp::acceptor    m_acceptor{m_io};
};

int main() {
    Server s;
    s.run(8989);
}

请注意,一开始就没有任何广播,所以我不确定问题是什么。

看到的是

  • io_service
    已弃用(使用
    io_context
  • 传递
    io_service&
    是反模式(使用执行器)
  • 您非常需要区分对
    Server
    User
    的关注。每个用户会话的处理程序需要使用
    User
    中的状态,并且更自然地成为
    User
    的成员。事实上,这是正确执行此操作的唯一方法,因为使用会话外广播,您必须保证写入的正确顺序,这意味着您必须有一个每个会话(用户)队列
  • close_connection 是反模式(IO 是外部“现实”,你遵循它,而不是绕过去)。而是使用
    enable_shared_from_this
  • 执行此操作时,使
    m_users
    存储
    weak_ptr
    ,以便您可以观察连接何时终止

我会在30分钟左右画出一个固定草图。

© www.soinside.com 2019 - 2024. All rights reserved.