Java CompletableFuture然后由多个异步任务组成

问题描述 投票:2回答:1

我有一个由2个异步步骤组成的过程。第二步基于第一步的结果运行。该过程是循环启动的。挑战在于,第二步是由多个异步任务完成的,这些任务将第一步迭代的输出作为结果。第一步完成后,我想使用此第一步结果启动n秒步骤。我使用CompletableFuturethenCompose编写了这段代码。

它有效,但我发现它相当复杂,我想知道这是否是正确的方法。我特别想知道,管理第二级子任务和使用CompletableFuture.allOf使其像一个CompletableFuture一样是否是正确的方法。

public void test() {
    // Gather CompletableFutures to wait for them at the end
    List<CompletableFuture> futures = new ArrayList<>();

    // First steps
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        CompletableFuture<Void> fut = CompletableFuture.supplyAsync(() -> {
            logger.debug("Start step 1 - " + finalI);
            simulateLongProcessing();// just waits for 1 s
            logger.debug("End step 1 - " + finalI);
            return "step1 output - " + finalI;
        }).thenCompose(s -> {
            List<CompletableFuture> subFutures = new ArrayList<>();
            // Second step : Launch several sub-tasks based on the result of the first step
            for (int j = 0; j < 50; j++) {
                final int finalJ = j;
                CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
                    logger.debug("Start - step 2 : " + s + " | " + finalJ);
                    simulateLongProcessing();
                    logger.debug("End - step 2 : " + s + " | " + finalJ);
                    return "step2 output - " + s + " | " + finalJ;
                });
                subFutures.add(f);
            }
            return CompletableFuture.allOf(subFutures.toArray(new CompletableFuture[0]));
        });
        futures.add(fut);
    }

    // Wait for the completion
    for (CompletableFuture future : futures) {
        future.join();
    }
}
java concurrency completable-future
1个回答
0
投票

当您可以使用简单的CompletableFuture.supplyAsync链接相同的从属函数时,请勿在传递给thenCompose的函数中执行thenApplyAsync。>

通过thenApplyAsync绑定从属函数可让您在完成第一步之前获取代表这些步骤的CompletableFuture实例,因此您可以将它们全部收集到您的List中以等待最后完成,而不必不需要通过CompletableFuture.allOf创建复合期货。

public void test() {
    // Gather CompletableFutures to wait for them at the end
    List<CompletableFuture<?>> futures = new ArrayList<>();

    for (int i = 0; i < 10; i++) {
        int finalI = i;
        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
            logger.debug("Start step 1 - " + finalI);
            simulateLongProcessing();// just waits for 1 s
            logger.debug("End step 1 - " + finalI);
            return "step1 output - " + finalI;
        });
        // Second step : Chain several sub-tasks based on the result of the first step
        for (int j = 0; j < 50; j++) {
            final int finalJ = j;
            futures.add(step1.thenApplyAsync(s -> {
                logger.debug("Start - step 2 : " + s + " | " + finalJ);
                simulateLongProcessing();
                logger.debug("End - step 2 : " + s + " | " + finalJ);
                return "step2 output - " + s + " | " + finalJ;
            }));
        }
    }

    // Wait for the completion
    for (CompletableFuture<?> future : futures) {
        future.join();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.