如何立即将任务从一个ThreadPool传递到另一个?

问题描述 投票:0回答:2

我有一个输入元素列表,我想要排队到几个ThreadPools。让我们说这是我的意见:

final List<Integer> ints = Stream.iterate(1, i -> i + 1).limit(100).collect(Collectors.toList());

这些是我希望元素逐个遍历的三个函数:

final Function<Integer, Integer> step1 =
        value -> { // input from the ints list
            return value * 2;
        };

final Function<Integer, Double> step2 =
        value -> { // input from the previous step1
            return (double) (value * 2); //
        };

final Function<Double, String> step3 =
        value -> { // input from the previous step2
            return "Result: " + value * 2;
        };

这些将是每个步骤的池:

final ExecutorService step1Pool = Executors.newFixedThreadPool(4);
final ExecutorService step2Pool = Executors.newFixedThreadPool(3);
final ExecutorService step3Pool = Executors.newFixedThreadPool(1);

我希望每个元素都通过step1Pool运行并应用step1。一旦完成一个元素,其结果应该最终在step2pool中,以便可以在此处应用step2。一旦step2Pool中的某些东西完成,它应该在step3Pool排队,并且应该应用step3。在我的主线程上,我想等到step3获得所有结果。处理每个元素的顺序无关紧要。只有他们都在正确的线程池上运行step1 - > step2 - > step3

基本上我想并行化Stream.map,立即将每个结果推送到下一个队列,并等到我从上一个线程池返回ints.size()结果。

有一种简单的方法可以在Java中实现吗?

java threadpool executorservice
2个回答
4
投票

我相信CompletableFuture会帮助你!

List<CompletableFuture<String>> futures = ints.stream()
            .map(i -> CompletableFuture.supplyAsync(() -> step1.apply(i), step1Pool)
                    .thenApplyAsync(step2, step2Pool)
                    .thenApplyAsync(step3, step3Pool))
            .collect(Collectors.toList());
List<String> result = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());

0
投票

更好地使用流:

List<String> stringList = Stream.iterate(1, i -> i + 1)
                .limit(100)
                .parallel()
                .map(step1)
                .map(step2)
                .map(step3)
                .collect(Collectors.toList());
© www.soinside.com 2019 - 2024. All rights reserved.