我有一个已经以同步风格编写的程序,它使用了一些需要它的必要库。但我认为它可能会受益于使用
async
功能来实现某些功能。所以我想看看如何通过通道在这些部分之间发送消息,其中发送路径是同步的,并且接收部分位于 async
任务中。
例如,std::sync::mpsc
和tokio::sync::mpsc
中都有 MPSC 通道,它们看起来非常相似,并且似乎不支持我想要的情况。
tokio
文档有一个涵盖此类案例的教程部分这里。在“发送消息”部分中有一个通过通道发送消息的示例,正是针对这种情况。
下面是一个稍微修改过的使用多线程运行时的示例。为了练习,我也尝试做同样的技巧,但使用
std::sync::mpsc
,结果有点复杂,但也是可行的。
Cargo.toml
中的依赖关系:
[dependencies]
tokio = { version = "1", features = ["full"] }
rand = "0.8.5"
src/main.rs
是:
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::Receiver;
use std::task::Poll;
use std::thread;
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
type Message = u64;
pub struct SyncReceiver<'a> (&'a mut Receiver<Message>);
impl Future for SyncReceiver<'_>
{
type Output = Option<Message>;
fn poll(self: Pin<&mut Self>, _: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match self.0.try_recv() {
Ok(x) => Poll::Ready(Some(x)),
Err(std::sync::mpsc::TryRecvError::Empty) => Poll::Pending,
Err(std::sync::mpsc::TryRecvError::Disconnected) => Poll::Ready(None)
}
}
}
fn main() {
let rt = Runtime::new().expect("Start tokio runtime.");
let (sender_async, mut receiver_async) = mpsc::channel::<Message>(10);
let (sender_sync, mut receiver_sync) = std::sync::mpsc::channel::<Message>();
rt.spawn(async move {
loop {
let x = receiver_async.recv().await;
if x == None { break; }
println!("Recv async channel: {:?}", x);
let x = SyncReceiver(&mut receiver_sync).await;
if x == None { break; }
println!("Recv sync channel: {:?}", x);
}
});
loop {
thread::sleep(Duration::from_millis(600));
match sender_async.blocking_send(rand::random::<Message>()) {
Err(e) => println!("Async send err: {}", e),
Ok(()) => println!("Async sent."),
}
match sender_sync.send(rand::random::<Message>()) {
Err(e) => println!("Sync send err: {}", e),
Ok(()) => println!("Sync sent."),
}
}
}
因此,对于
tokio::sync::mpsc
情况,示例在同步端使用 blocking_send
方法。在 std::sync::mpsc
的情况下,实现了一个适配器,将 recv() -> Result 方法转换为 Future
,可以在具有 async
语法的 .await
代码中正常使用。
注意
.await
拥有其参数的所有权,因此每次都必须创建适配器。