想象一下,一些 future 存储在一个
Vec
中,其长度是运行时确定的,你应该同时加入这些 future,你该怎么办?
显然,根据
tokio::join
文档中的示例,手动指定 Vec
的每个长度,例如 1、2、3...,并且处理合适的情况应该可行。
extern crate tokio;
let v = Vec::new();
v.push(future_1);
// directly or indirectly you push many futures to the vector
v.push(future_N);
// to join these futures concurrently one possible way is
if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...
我还注意到,当我使用类似
的语法时,
tokio::join!
在文档中采用列表作为参数
tokio::join!(v);
或者类似的东西
tokio::join![ v ] / tokio::join![ v[..] ] / tokio::join![ v[..][..] ]
它根本不起作用
问题来了,是否有任何途径可以更有效地加入这些未来,或者我是否应该遗漏一些与文件所述相反的内容?
join_all
和 try_join_all
,以及来自同一个板条箱 FuturesOrdered
的更通用的 FuturesUnordered
和 futures
实用程序,被轮询为单个组合的未来。如果组成的 future 很简单并且通常不会同时准备好执行工作,那么这可能没问题,但是以这种方式组合 future 存在两个潜在问题。首先,您将无法在多线程运行时中利用 CPU 并行性。其次,如果 future 使用共享同步原语,这会带来“死锁的可能性”。如果确实出现这些问题,请考虑将各个 future 作为单独的任务生成并等待任务完成。
Tokio 1.21.0 或更高版本:JoinSet
JoinSet
获得最大的灵活性,包括中止所有任务的能力。当
JoinSet
被删除时,集合中的任务也会中止。use tokio::task::JoinSet;
let mut set = JoinSet::new();
for fut in v {
set.spawn(fut);
}
while let Some(res) = set.join_next().await {
let out = res?;
// ...
}
旧 API
tokio::spawn
生成任务并等待连接句柄:
use futures::future;
// ...
let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;
您还可以使用
FuturesOrdered
和
FuturesUnordered
组合器在流中异步处理输出:use futures::stream::FuturesUnordered;
use futures::prelude::*;
// ...
let mut completion_stream = v.into_iter()
.map(tokio::spawn)
.collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
// ...
}
以这种方式等待任务的一个警告是,当生成任务并可能拥有返回的
JoinHandle
的 future(例如异步块)被删除时,任务不会被取消。需要使用
JoinHandle::abort
方法显式取消任务。#[tokio::main]
async fn main() {
let tasks = (0..5).map(|i| tokio::spawn(async move {
sleep(Duration::from_secs(1)).await; // simulate some work
i * 2
})).collect::<FuturesUnordered<_>>();
let result = futures::future::join_all(tasks).await;
println!("{:?}", result); // [Ok(8), Ok(6), Ok(4), Ok(2), Ok(0)]
}