如何在 Rust 中将消息从同步线程发送到异步任务

问题描述 投票:0回答:1

我有一个已经以同步风格编写的程序,它使用了一些需要它的必要库。但我认为它可能会受益于使用

async
功能来实现某些功能。所以我想看看如何通过通道在这些部分之间发送消息,其中发送路径是同步的,并且接收部分位于
async
任务中。 例如,
std::sync::mpsc
tokio::sync::mpsc
中都有 MPSC 通道,它们看起来非常相似,并且似乎不支持我想要的情况。

rust async-await channel rust-tokio
1个回答
0
投票

tokio
文档有一个涵盖此类案例的教程部分这里。在“发送消息”部分中有一个通过通道发送消息的示例,正是针对这种情况。

下面是一个稍微修改过的使用多线程运行时的示例。为了练习,我也尝试做同样的技巧,但使用

std::sync::mpsc
,结果有点复杂,但也是可行的。

Cargo.toml
中的依赖关系:

[dependencies]
tokio = { version = "1", features = ["full"] }
rand = "0.8.5"

src/main.rs
是:

use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::Receiver;
use std::task::Poll;
use std::thread;
use std::time::Duration;

use tokio::runtime::Runtime;
use tokio::sync::mpsc;

type Message = u64;

pub struct SyncReceiver<'a> (&'a mut Receiver<Message>);

impl Future for SyncReceiver<'_>
{
    type Output = Option<Message>;

    fn poll(self: Pin<&mut Self>, _: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        match self.0.try_recv() {
            Ok(x) => Poll::Ready(Some(x)),
            Err(std::sync::mpsc::TryRecvError::Empty) => Poll::Pending,
            Err(std::sync::mpsc::TryRecvError::Disconnected) => Poll::Ready(None)
        }
    }
}

fn main() {
    let rt = Runtime::new().expect("Start tokio runtime.");

    let (sender_async, mut receiver_async) = mpsc::channel::<Message>(10);
    let (sender_sync, mut receiver_sync) = std::sync::mpsc::channel::<Message>();

    rt.spawn(async move {
        loop {
            let x = receiver_async.recv().await;
            if x == None { break; }
            println!("Recv async channel: {:?}", x);

            let x = SyncReceiver(&mut receiver_sync).await;
            if x == None { break; }
            println!("Recv sync channel: {:?}", x);
        }
    });

    loop {
        thread::sleep(Duration::from_millis(600));
        match sender_async.blocking_send(rand::random::<Message>()) {
            Err(e) => println!("Async send err: {}", e),
            Ok(()) => println!("Async sent."),
        }

        match sender_sync.send(rand::random::<Message>()) {
            Err(e) => println!("Sync send err: {}", e),
            Ok(()) => println!("Sync sent."),
        }
    }
}

因此,对于

tokio::sync::mpsc
情况,示例在同步端使用
blocking_send
方法。在
std::sync::mpsc
的情况下,实现了一个适配器,将 recv() -> Result 方法转换为
Future
,可以在具有
async
语法的
.await
代码中正常使用。

注意

.await
拥有其参数的所有权,因此每次都必须创建适配器。

© www.soinside.com 2019 - 2024. All rights reserved.