如何实现 CompletableFuture.allOf() 在任何 future 失败时异常完成?

问题描述 投票:0回答:3

我想实现

CompletableFuture.allOf()
CompletableFuture.anyOf()
的混合,其中一旦所有元素成功完成,返回的 future 就会成功完成,或者只要任何元素异常完成,它就会异常完成(具有相同的异常) 。在多个元素失败的情况下,返回其中任何一个的异常就足够了。

用例

我有一个任务需要聚合

CompletableFuture
列表返回的子结果,但是一旦其中任何一个失败,该任务就应该停止等待。我知道子任务将继续运行,这没关系。

相关问题

我发现 Waiting on a list of Future 最初看起来像是一个重复的问题,但接受的答案使用

CompletionService
,它需要
Callable
Runnable
作为输入。我正在寻找一种将已运行的
CompletionStage
作为输入的解决方案。

java completable-future
3个回答
10
投票

这个问题其实和 Replace Futures.successfulAsList with Java 8 CompletableFuture 非常相似?

虽然问题不完全相同,但相同的答案(来自我自己)应该可以满足您的需求。

您可以使用

allOf()
的组合来实现此功能,并将每个输入 future 与
exceptionally()
链接起来,这将使
allOf()
返回的 future 立即失败:

CompletableFuture<String> a = …, b = …, c = …;
CompletableFuture<Void> allWithFailFast = CompletableFuture.allOf(a, b, c);
Stream.of(a, b, c)
    .forEach(f -> f.exceptionally(e -> {
        allWithFailFast.completeExceptionally(e);
        return null;
    }));

1
投票

我相信这可以完成工作:

/**
 * @param arrayOfFutures an array of futures to wait on
 * @return a {@code CompletableFuture} that completes successfully once all elements have completed successfully, or completes
 * exceptionally after any of the elements has done the same
 * <br>{@code @throws NullPointerException} if {@code arrayOfFutures} is null
 */
public CompletableFuture<Void> waitForAllButAbortOnFirstException(CompletableFuture<?>... arrayOfFutures)
{
    if (arrayOfFutures == null)
      return CompletableFuture.failedFuture(new NullPointerException("arrayOfFutures may not be null"));
    if (arrayOfFutures.length == 0)
        return CompletableFuture.completedFuture(null);
    return CompletableFuture.anyOf(arrayOfFutures).
        thenApply(unused ->
        {
            // Remove any futures that completed normally and try again
            return Arrays.stream(arrayOfFutures).
                filter(element -> !element.isDone() || element.isCompletedExceptionally()).
                toArray(CompletableFuture[]::new);
        }).
        thenCompose(remainingFutures -> waitForAllButAbortOnFirstException(remainingFutures));
}

0
投票

根据@Gili的回答,我编写了我正在使用的这段代码 - 我认为它基本相同,但易于阅读,并且可能在非等待部分花费更少的时间:

/**
 * Wait for all of the futures to complete successfully unless at least one of
 * them has failed (completed with an exception) - then complete
 * immediately with the first exception received.
 * @param futures list of completable futures to wait on
 * @return a future that will complete successfully as soon as all submitted
 *   futures has completed successfully, or will complete exceptionally as soon
 *   as one of the submitted futures has completed exceptionally. 
 */
public static CompletableFuture<Void> allOfUnlessFailed(
        CompletableFuture<?> ...futures) {
    ConcurrentLinkedQueue<CompletableFuture<?>> queue =
            new ConcurrentLinkedQueue<>();
    for (CompletableFuture<?> f : futures) {
        queue.add(f);
        f.thenRun(() -> queue.remove(f));
    }
    return allOfUnlessFailed(queue);
}
    
private static CompletableFuture<Void> allOfUnlessFailed(
        ConcurrentLinkedQueue<CompletableFuture<?>> queue) {
    CompletableFuture<?>[] rest = queue.toArray(new CompletableFuture[] {});
    if (rest.length == 0)
        return CompletableFuture.completedFuture(null);
    return CompletableFuture.anyOf(rest)
            .thenCompose(__ -> allOfUnlessFailed(queue));
}
© www.soinside.com 2019 - 2024. All rights reserved.