我尝试通过 async_write 发送消息,但它们仅在我关闭服务器(ctrl-c)后发送
例如: 作为客户端,我发送“test”和“test2”,只有在关闭服务器后客户端才会收到“testtest2”
我正在进行一个聊天,该聊天接受来自用户的消息(成功)并且必须将其广播给每个人
消息发送代码
void Server::writeHandler(int id, boost::system::error_code error){
if (!error){
std::cout << "[DEBUG] message broadcasted ";
} else {
close_connection(id);
}
}
void Server::broadcast(std::string msg, boost::system::error_code error){
for (auto& user : m_users){ // for every user in unordered map of users
asio::async_write(user.second->socket, asio::buffer(msg, msg.size()),
std::bind(&Server::writeHandler, this, user.first, std::placeholders::_1));
}
}
onMessage 中的广播调用
void Server::onMessage(int id, boost::system::error_code error){
if (!error){
broadcast(m_read_msg, error); // char m_read_msg[PACK_SIZE] // PACK_SIZE = 512
asio::async_read(m_users[id].get()->socket, asio::buffer(m_read_msg, PACK_SIZE), // PACK_SIZE = 512
std::bind(&Server::onMessage, this, id, std::placeholders::_1));
} else {
close_connection(id);
}
}
服务器运行功能:
void Server::run(int port){
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
std::cout << "[DEBUG] binded on " << port << std::endl;
// acceptor initialization
m_acceptor = std::make_shared<asio::ip::tcp::acceptor>(m_io, endpoint);
// start to listen for connections
listen();
// run the io_service in thread
io_thread = std::thread( [&]{ m_io.run(); } ); // m_io = io_serivce
while (true){
}
}
void Server::listen(){
std::shared_ptr<User> pUser(new User(m_io));
m_acceptor->async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
}
无限循环是 C++ 中的UB。
您的
std::string msg
是本地(参数),将其传递给异步操作 (async_write
) 也是 UB。
启动一个线程,紧接着无限循环是没有用的,除非目的是浪费大量CPU资源。所以,将其替换为
m_io.run();
接下来,代码远非独立,并且缺乏各种错误处理方式。这是我想象的最小完成度,修复了已经提到的 UB:
#include <boost/asio.hpp>
#include <iostream>
#include <map>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;
struct User {
User(asio::io_service& io) : socket(io) {}
tcp::socket socket;
};
struct Server {
void run(uint16_t port) {
m_acceptor = {m_io, {{}, port}};
std::cout << "[DEBUG] bound on " << port << std::endl;
listen();
m_io.run();
}
void close_connection(int) {}
std::map<int, std::shared_ptr<User>> m_users;
void writeHandler(int id, error_code error, std::shared_ptr<void>) {
if (!error) {
std::cout << "[DEBUG] message broadcasted ";
} else {
close_connection(id);
}
}
void broadcast(std::string msg, boost::system::error_code /*error*/) {
auto shared_msg = std::make_shared<std::string>(std::move(msg));
for (auto& [id, usr] : m_users) { // for every user in unordered map of users
asio::async_write(usr->socket, asio::buffer(*shared_msg),
bind(&Server::writeHandler, this, id, _1, shared_msg));
}
}
void listen() {
std::shared_ptr<User> pUser(new User(m_io));
m_acceptor.async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
}
void onAccept(std::shared_ptr<User> /*user*/) { listen(); }
asio::io_service m_io;
tcp::acceptor m_acceptor{m_io};
};
int main() {
Server s;
s.run(8989);
}
请注意,一开始就没有任何广播,所以我不确定问题是什么。
我能看到的是
io_service
已弃用(使用 io_context
)io_service&
是反模式(使用执行器)Server
和 User
的关注。每个用户会话的处理程序需要使用 User
中的状态,并且更自然地成为 User
的成员。事实上,这是正确执行此操作的唯一方法,因为使用会话外广播,您必须保证写入的正确顺序,这意味着您必须有一个每个会话(用户)队列enable_shared_from_this
m_users
存储 weak_ptr
,以便您可以观察连接何时终止我会在30分钟左右画出一个固定草图。