如何tokio::加入多个任务?

问题描述 投票:0回答:3

想象一下,一些 future 存储在一个

Vec
中,其长度是运行时确定的,你应该同时加入这些 future,你该怎么办?

显然,根据

tokio::join
文档中的示例,手动指定
Vec
的每个长度,例如 1、2、3...,并且处理合适的情况应该可行。

extern crate tokio;

let v = Vec::new();
v.push(future_1);

// directly or indirectly you push many futures to the vector
 
v.push(future_N);

// to join these futures concurrently one possible way is 

if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...

我还注意到,当我使用类似

的语法时,
tokio::join!

在文档中采用列表作为参数
tokio::join!(v);

或者类似的东西

tokio::join![ v ] /  tokio::join![ v[..] ] / tokio::join![ v[..][..] ]

它根本不起作用

问题来了,是否有任何途径可以更有效地加入这些未来,或者我是否应该遗漏一些与文件所述相反的内容?

rust async-await concurrency rust-tokio
3个回答
29
投票

join_all
try_join_all
,以及来自同一个板条箱
FuturesOrdered
的更通用的
FuturesUnordered
futures
实用程序,被轮询为单个组合的未来。如果组成的 future 很简单并且通常不会同时准备好执行工作,那么这可能没问题,但是以这种方式组合 future 存在两个潜在问题。首先,您将无法在多线程运行时中利用 CPU 并行性。其次,如果 future 使用共享同步原语,这会带来“死锁的可能性”。如果确实出现这些问题,请考虑将各个 future 作为单独的任务生成并等待任务完成。 Tokio 1.21.0 或更高版本:JoinSet

在最近的 Tokio 版本中,您可以使用

JoinSet

获得最大的灵活性,包括中止所有任务的能力。当

JoinSet
被删除时,集合中的任务也会中止。
use tokio::task::JoinSet;

let mut set = JoinSet::new();

for fut in v {
    set.spawn(fut);
}

while let Some(res) = set.join_next().await {
    let out = res?;
    // ...
}

旧 API 

使用

tokio::spawn

生成任务并等待连接句柄:

use futures::future;

// ...

let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;

您还可以使用 
FuturesOrdered

FuturesUnordered
组合器在流中异步处理输出:
use futures::stream::FuturesUnordered;
use futures::prelude::*;

// ...

let mut completion_stream = v.into_iter()
    .map(tokio::spawn)
    .collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
    // ...    
}

以这种方式等待任务的一个警告是,当生成任务并可能拥有返回的 
JoinHandle

的 future(例如异步块)被删除时,任务不会被取消。需要使用

JoinHandle::abort
方法显式取消任务。
    



7
投票

#[tokio::main] async fn main() { let tasks = (0..5).map(|i| tokio::spawn(async move { sleep(Duration::from_secs(1)).await; // simulate some work i * 2 })).collect::<FuturesUnordered<_>>(); let result = futures::future::join_all(tasks).await; println!("{:?}", result); // [Ok(8), Ok(6), Ok(4), Ok(2), Ok(0)] }

游乐场

© www.soinside.com 2019 - 2024. All rights reserved.