如何使用 tokio 将依赖局部变量的 cpu 绑定任务与异步函数分开?

问题描述 投票:0回答:1

我是异步编程和 Rust 的新手。基本上,我正在做类似的事情。

fn func() {
    let task1 = tokio::spawn(async move || {
        let local_variable = ...;
        let res = heavy_computation(&local_variable);
        io(res).await;
    });
    let task2 = tokio::spawn(...);
}

计算可能会阻塞 tokio 运行时,并且

task2
可能会卡住。我想解决这个问题。

我尝试过

rayon::scope
std::thread::scope
,因为我不想克隆局部变量。喜欢

fn func() {
    let task1 = tokio::spawn(async move || {
    let local_variable = ...;
    std::thread::scope(|s| {
        let (send, recv) = channel;
        s.spawn(|| {
            heavy_computation(&local_variable);
            send.send();
        });
        tokio::spawn(async move {
            let res = recv.await.unwrap();
            io(res).await;
        });
    });
    let task2 = tokio::spawn(...);
}

但这似乎没有帮助。猜测是因为嵌套的运行时不相同。实在想不出什么好办法。

asynchronous rust rust-tokio
1个回答
0
投票

Alice Rhyl 有一篇很好的博客文章,介绍了如何处理异步中的阻塞代码。

“最简单”的选项只是通过“借用”作为移入和移出来避免共享状态,然后您可以按照调整后的博客中的 Rayon 示例进行操作:

    let (send, recv) = tokio::sync::oneshot::channel();

    let local_variable = ...;

    // Spawn a task on rayon or just spawn a new thread with std::thread::spawn.
    rayon::spawn(move || {
        
        let res = heavy_computation(local_variable);

        // Send the result back to Tokio.
        let _ = send.send((res, local_variable));
    });

    // Wait for the rayon task.
    let (res, local_variable) = recv.await.expect("Panic in rayon::spawn")

但是,如果 local 很大且无法优化,或者稍后在计算运行时在任务中需要它,那么这仍然不理想。另外,这并不符合你问题的实质。选项二是将其设为

Send
,而不是键入
T
执行
Arc<T>

let local_variable = std::sync::Arc::new(...);
    let copy = local_variable.clone();
    
    rayon::spawn(move || {
        
        heavy_computation(&*copy);

如果你确实想要共享和借用,那就是管理生命周期的问题。问题是您必须确保线程不会超出您的任务的寿命,该范围正在为您处理,因为它会自动加入线程。这就是导致阻塞的原因。

根据博客文章,用

tokio::spawn_blocking
替换 let task1 = tokio::spawn( 可以解决您的问题。生成阻塞任务处理等待线程、人造丝生成或作用域线程退出,而不会在附加计算线程执行其操作时阻塞其他任务工作人员。

也就是说,在我看来,使用 Arc 要简单得多,而且它还可以节省阻塞的线程。

© www.soinside.com 2019 - 2024. All rights reserved.