Rust多线程异步Websocket服务器。

问题描述 投票:-1回答:1

我想学习Rust,所以我决定用它来做一个实际的项目。

这个想法是要有一个服务器,它可以

  1. 从主线 A 开启新线程 B 执行一些异步任务,通过时间产生一个值流。

  2. 接收客户端websocket连接 [c, d, e, ..] 异步处理,并同时生成新的线程。[C, D, E, ...]

  3. 将线程B中产生的值发送给线程 [C, D, E, ...]

  4. 每根线 [C, D, E, ...] 将价值发布到各自的客户机上,在 [c, d, e, ..]

我使用的是

  • tokio 催生新线程和 tokio::sync::mpsc::unbounded_channel 中计算出的值,以发送 B 到其他线路

  • tokio_tungstenite 来管理websocket连接并向客户端发送值。

我设法得到一个工作的例子,其中线程 B 产生整数和固定的时间间隔。当服务器启动时。B 开始产生一个值流 [0,1,2,3, ..].

当打开一个新的 websocket 连接时,客户端将从连接打开后产生的值开始接收数据流(因此,如果连接开始后的值是一个新的值,那么客户端将接收一个新的数据流。3 是由 B的值,那么客户端将从 4 上)。)

这里是陷阱。

我发现的唯一方法,为接收部分的通道在 C 以异步接收值(因此防止它缓冲值并将其发送给 c 正当 B 是完全完成的)是使用一个循环,我相信这个循环会消耗100%的CPU。

我注意到,正因为如此,每一个websocket连接都会消耗100%的CPU(所以如果有两个连接打开,CPU使用量将是200%,以此类推)。

这是一个循环。

loop {
    while let Ok(v) = rx.try_recv() {
       println!("PRINTER ID [{}] | RECEIVED: {:#?}", addr, v);
       println!("PRINTER ID [{}] | SENDING TO WS: {:#?}", addr, v);
       let mess = Message::Text(v.to_string());ws_sender.send(mess).await?;
}

如果我使用 recv() (而不是 try_recv())的值将会被缓冲,并在当 B 已完成。

我试着用 futures_channel::unbounded 而非 tokio 通道,但我有同样的缓冲区问题。

问题:如何重写上面的循环,以避免使用100%和流值到websocket而不阻塞?

你可以看到这里的tokio服务器。https:/github.comceikitasync_datablobmastersrcbintokio_server.rs。

你可以通过在另一个终端窗口中运行客户端旋转websocket连接来测试它。

multithreading websocket rust rust-tokio
1个回答
0
投票

需要改变 thread::sleep 使用 futures-timersync::Mutexfutures::lock::Mutex那么,a while-letrecv() 一帆风顺

© www.soinside.com 2019 - 2024. All rights reserved.