如何使用 Rayon 的线程池生成重复的任务链?

问题描述 投票:0回答:1

我正在尝试直接使用 rayon; 的线程池并行化某些东西。

这个想法有两个步骤。首先假设您有 3 个单一任务 A、B、C。

我想以这样的方式在线程池中生成任务:当A终止时,它调用B,B在终止时调用C,最后C将调用A,在循环中重复该链。我想在不消失线程的情况下执行此操作。

第二步是做同样的事情,但现在 A、B、C 代表任务组而不是单个任务。所以说 A 代表同时在多个容器上运行的 10K 任务,那么 B 是一组执行类似操作的 50k 任务,然后 C 是单个任务,然后循环。

到目前为止,我所拥有的只是线程池初始化和生成虚拟任务:

    
    let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
    pool.spawn(
        move || {
            println!("dummy");
        }
    );

我遇到的主要问题是如何指定循环行为,因为我在文档中没有看到它。即如何创建任务链 A -> B -> C,然后如何将其扩展到任务组。

multithreading rust parallel-processing rayon
1个回答
0
投票

为了像你所说的那样排序(“当A终止时它调用B”),你需要有某种方式来注意到A的所有任务何时完成。最简单的方法是在 scope 中生成它们,等待所有任务完成。然后,如果您这样做,您不妨使用单个函数来对 A、B 和 C 的范围进行排序。

这是一个演示程序。请注意,在泳池内部,使用的是

rayon::scope
,而不是
pool.scope()
;这仍然使用
pool
,因为它是在
pool
的上下文中执行的。

fn main() {
    let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
    pool.spawn(move || {
        for _ in 0..10 { // this would be `loop {` in the real program
            rayon::scope(|scope| {
                for _ in 0..10 {
                    scope.spawn(|_| println!("task A"));
                }
            });

            rayon::scope(|scope| {
                for _ in 0..10 {
                    scope.spawn(|_| println!("task B"));
                }
            });
    
            rayon::scope(|scope| {
                for _ in 0..10 {
                    scope.spawn(|_| println!("task C"));
                }
            });
        }
    });

    // Apparently dropping the pool stops spawns, so keep it alive for a bit.
    std::thread::sleep(std::time::Duration::from_secs(1));
}
© www.soinside.com 2019 - 2024. All rights reserved.