我想学习Rust,所以我决定用它来做一个实际的项目。
这个想法是要有一个服务器,它可以
从主线 A
开启新线程 B
执行一些异步任务,通过时间产生一个值流。
接收客户端websocket连接 [c, d, e, ..]
异步处理,并同时生成新的线程。[C, D, E, ...]
将线程B中产生的值发送给线程 [C, D, E, ...]
每根线 [C, D, E, ...]
将价值发布到各自的客户机上,在 [c, d, e, ..]
我使用的是
tokio
催生新线程和 tokio::sync::mpsc::unbounded_channel
中计算出的值,以发送 B
到其他线路
tokio_tungstenite
来管理websocket连接并向客户端发送值。
我设法得到一个工作的例子,其中线程 B
产生整数和固定的时间间隔。当服务器启动时。B
开始产生一个值流 [0,1,2,3, ..]
.
当打开一个新的 websocket 连接时,客户端将从连接打开后产生的值开始接收数据流(因此,如果连接开始后的值是一个新的值,那么客户端将接收一个新的数据流。3
是由 B
的值,那么客户端将从 4
上)。)
这里是陷阱。
我发现的唯一方法,为接收部分的通道在 C
以异步接收值(因此防止它缓冲值并将其发送给 c
正当 B
是完全完成的)是使用一个循环,我相信这个循环会消耗100%的CPU。
我注意到,正因为如此,每一个websocket连接都会消耗100%的CPU(所以如果有两个连接打开,CPU使用量将是200%,以此类推)。
这是一个循环。
loop {
while let Ok(v) = rx.try_recv() {
println!("PRINTER ID [{}] | RECEIVED: {:#?}", addr, v);
println!("PRINTER ID [{}] | SENDING TO WS: {:#?}", addr, v);
let mess = Message::Text(v.to_string());ws_sender.send(mess).await?;
}
如果我使用 recv()
(而不是 try_recv()
)的值将会被缓冲,并在当 B
已完成。
我试着用 futures_channel::unbounded
而非 tokio
通道,但我有同样的缓冲区问题。
问题:如何重写上面的循环,以避免使用100%和流值到websocket而不阻塞?
你可以看到这里的tokio服务器。https:/github.comceikitasync_datablobmastersrcbintokio_server.rs。
你可以通过在另一个终端窗口中运行客户端旋转websocket连接来测试它。
需要改变 thread::sleep
使用 futures-timer
和 sync::Mutex
到 futures::lock::Mutex
那么,a while-let
与 recv()
一帆风顺