我正在尝试直接使用 rayon; 的线程池并行化某些东西。
这个想法有两个步骤。首先假设您有 3 个单一任务 A、B、C。
我想以这样的方式在线程池中生成任务:当A终止时,它调用B,B在终止时调用C,最后C将调用A,在循环中重复该链。我想在不消失线程的情况下执行此操作。
第二步是做同样的事情,但现在 A、B、C 代表任务组而不是单个任务。所以说 A 代表同时在多个容器上运行的 10K 任务,那么 B 是一组执行类似操作的 50k 任务,然后 C 是单个任务,然后循环。
到目前为止,我所拥有的只是线程池初始化和生成虚拟任务:
let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
pool.spawn(
move || {
println!("dummy");
}
);
我遇到的主要问题是如何指定循环行为,因为我在文档中没有看到它。即如何创建任务链 A -> B -> C,然后如何将其扩展到任务组。
为了像你所说的那样排序(“当A终止时它调用B”),你需要有某种方式来注意到A的所有任务何时完成。最简单的方法是在 scope 中生成它们,等待所有任务完成。然后,如果您这样做,您不妨使用单个函数来对 A、B 和 C 的范围进行排序。
这是一个演示程序。请注意,在泳池内部,使用的是
rayon::scope
,而不是 pool.scope()
;这仍然使用 pool
,因为它是在 pool
的上下文中执行的。
fn main() {
let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
pool.spawn(move || {
for _ in 0..10 { // this would be `loop {` in the real program
rayon::scope(|scope| {
for _ in 0..10 {
scope.spawn(|_| println!("task A"));
}
});
rayon::scope(|scope| {
for _ in 0..10 {
scope.spawn(|_| println!("task B"));
}
});
rayon::scope(|scope| {
for _ in 0..10 {
scope.spawn(|_| println!("task C"));
}
});
}
});
// Apparently dropping the pool stops spawns, so keep it alive for a bit.
std::thread::sleep(std::time::Duration::from_secs(1));
}