我编写了下面的代码,用 Rust 制作一个接受多个客户端的聊天服务器。我正处于一个阶段(代码可用here)它能够接受多个客户端并将一个客户端的消息写入所有其他客户端。我尝试创建一个结构来保存用户数据,它存储在向量中并包装在 Arc-Mutex 中。
我现在遇到的问题是来自一个用户的消息没有显示在其他用户中(代码到达写入套接字的部分,我使用 print 语句进行验证,但另一端没有出现任何内容) 。但是,当我关闭客户端连接时,如果有另一个客户端连接,它将向该客户端发送所有消息(例如,连接客户端 1,发送几条消息,连接客户端 2。现在,我在客户端 2 上执行的任何操作都不会显示)在服务器上,不会触发任何调试语句。我在客户端 1 上发送的任何消息都与之前相同。现在,如果我断开客户端 1 的连接,则连接客户端 2 后发送的任何消息都会发送到客户端 2。对于任何数量,情况都是如此连接数)。我假设这是我如何存储连接的客户端数据的问题,但我学习还为时过早,而且还没有足够的知识来可靠地弄清楚任何事情
use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::TcpListener};
use tokio::sync::{broadcast, Mutex as TokioMutex};
use std::sync::{Arc, Mutex};
// Define a struct to store user information including their username
#[derive(Debug)]
struct UserInfo {
username: String,
addr: std::net::SocketAddr,
}
#[tokio::main]
async fn main() {
let listener: TcpListener = TcpListener::bind("localhost:8080").await.unwrap();
let (tx, _rx) = broadcast::channel(10);
// let users = Arc::new(Mutex::new(vec![]));
let users = Arc::new(TokioMutex::new(vec![]));
loop {
let (mut socket, addr) = listener.accept().await.unwrap();
println!("New connection from: {}", addr);
let tx = tx.clone();
let users = users.clone();
let mut rx = tx.subscribe();
tokio::spawn(async move{
// Ask for username
let username = ask_for_username(&mut socket).await.unwrap();
println!("User {} connected from: {}", username, addr);
// Store user information
let user_info = UserInfo {
username: username.clone(),
addr,
};
// Add user to the list of users
let mut users_guard = users.lock().await;
users_guard.push(user_info);
let (read_half, mut write_half) = socket.split();
let mut reader = BufReader::new(read_half);
let mut line = String::new();
loop {
tokio::select! {
result = reader.read_line(&mut line) => {
if result.unwrap() == 0 {
break;
}
println!("Broadcasting message from {}: {}", username, line);
tx.send((line.clone(), addr)).unwrap();
line.clear();
},
result = rx.recv() => {
let (msg, other_addr ) = result.unwrap();
println!("Received message from another connection: {:?}", msg);
write_half.write_all(msg.as_bytes()).await.unwrap();
println!("Message sent to the user");
}
}
}
});
}
}
async fn ask_for_username(socket: &mut tokio::net::TcpStream) -> Result<String, std::io::Error> {
let mut username = String::new();
// Send a message asking for username
socket.write_all(b"Please enter your username: ").await?;
// Read the username from the client
let mut reader = BufReader::new(socket);
reader.read_line(&mut username).await?;
Ok(username.trim().to_string())
}
如果查看存储库,主分支包含标准聊天服务器实现,而用户名分支正在跟踪实现用户名的尝试。正如前面提到的,我还没有取得任何成功。我尝试过使用 tokio 和 std 互斥体,但两者都会产生相同类型的错误。
您的问题在这里:
// Add user to the list of users
let mut users_guard = users.lock().await;
users_guard.push(user_info);
users_guard
一直保留在范围内,直到封闭块的末尾,因此在此之前一直保持锁定。因此,在任何给定时间只有一个客户端可以处于读取循环中;其他客户端保持挂起状态,等待锁定,直到该客户端断开连接,此时互斥锁被解锁,另一个客户端可以取得进展。
您应该显式地
drop(users_guard);
,将上述代码放入其自己的块中,或者将上面的代码更改为users.lock().await.push(user_info);
,以确保锁定的持有时间不会超过必要的时间。