无法为其他客户端生成消息

问题描述 投票:0回答:1

我编写了下面的代码,用 Rust 制作一个接受多个客户端的聊天服务器。我正处于一个阶段(代码可用here)它能够接受多个客户端并将一个客户端的消息写入所有其他客户端。我尝试创建一个结构来保存用户数据,它存储在向量中并包装在 Arc-Mutex 中。

我现在遇到的问题是来自一个用户的消息没有显示在其他用户中(代码到达写入套接字的部分,我使用 print 语句进行验证,但另一端没有出现任何内容) 。但是,当我关闭客户端连接时,如果有另一个客户端连接,它将向该客户端发送所有消息(例如,连接客户端 1,发送几条消息,连接客户端 2。现在,我在客户端 2 上执行的任何操作都不会显示)在服务器上,不会触发任何调试语句。我在客户端 1 上发送的任何消息都与之前相同。现在,如果我断开客户端 1 的连接,则连接客户端 2 后发送的任何消息都会发送到客户端 2。对于任何数量,情况都是如此连接数)。我假设这是我如何存储连接的客户端数据的问题,但我学习还为时过早,而且还没有足够的知识来可靠地弄清楚任何事情

use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::TcpListener};
use tokio::sync::{broadcast, Mutex as TokioMutex};
use std::sync::{Arc, Mutex};



// Define a struct to store user information including their username
#[derive(Debug)]
struct UserInfo {
    username: String,
    addr: std::net::SocketAddr,
}

#[tokio::main]
async fn main() {
    let listener: TcpListener = TcpListener::bind("localhost:8080").await.unwrap();

    let (tx, _rx) = broadcast::channel(10);

    // let users = Arc::new(Mutex::new(vec![]));
    let users = Arc::new(TokioMutex::new(vec![]));

    loop {
        let (mut socket, addr) = listener.accept().await.unwrap();
        println!("New connection from: {}", addr);

        let tx = tx.clone();
        let users = users.clone();
        let mut rx = tx.subscribe();

        tokio::spawn(async move{
            // Ask for username
            let username = ask_for_username(&mut socket).await.unwrap();
            println!("User {} connected from: {}", username, addr);

            // Store user information
            let user_info = UserInfo {
                username: username.clone(),
                addr,
            };

            // Add user to the list of users
            let mut users_guard = users.lock().await;
            users_guard.push(user_info);

            let (read_half, mut write_half) = socket.split();

            let mut reader = BufReader::new(read_half);
            let mut line = String::new();

            loop {
                tokio::select! {
                    result = reader.read_line(&mut line) => {
                        if result.unwrap() == 0 {
                            break;
                        }
                        println!("Broadcasting message from {}: {}", username, line);
                        tx.send((line.clone(), addr)).unwrap();

                        line.clear();
                    },
                    result = rx.recv() => {
                        let (msg, other_addr ) = result.unwrap();

                        println!("Received message from another connection: {:?}", msg);


                        write_half.write_all(msg.as_bytes()).await.unwrap();

                        println!("Message sent to the user");
                    }
                }

            }
        });
    }

}

async fn ask_for_username(socket: &mut tokio::net::TcpStream) -> Result<String, std::io::Error> {
    let mut username = String::new();

    // Send a message asking for username
    socket.write_all(b"Please enter your username: ").await?;

    // Read the username from the client
    let mut reader = BufReader::new(socket);
    reader.read_line(&mut username).await?;

    Ok(username.trim().to_string())
}

如果查看存储库,主分支包含标准聊天服务器实现,而用户名分支正在跟踪实现用户名的尝试。正如前面提到的,我还没有取得任何成功。我尝试过使用 tokio 和 std 互斥体,但两者都会产生相同类型的错误。

rust concurrency rust-tokio
1个回答
0
投票

您的问题在这里:

// Add user to the list of users
let mut users_guard = users.lock().await;
users_guard.push(user_info);

users_guard
一直保留在范围内,直到封闭块的末尾,因此在此之前一直保持锁定。因此,在任何给定时间只有一个客户端可以处于读取循环中;其他客户端保持挂起状态,等待锁定,直到该客户端断开连接,此时互斥锁被解锁,另一个客户端可以取得进展。

您应该显式地

drop(users_guard);
,将上述代码放入其自己的块中,或者将上面的代码更改为
users.lock().await.push(user_info);
,以确保锁定的持有时间不会超过必要的时间。

© www.soinside.com 2019 - 2024. All rights reserved.