我想实现
CompletableFuture.allOf()
和 CompletableFuture.anyOf()
的混合,其中一旦所有元素成功完成,返回的 future 就会成功完成,或者只要任何元素异常完成,它就会异常完成(具有相同的异常) 。在多个元素失败的情况下,返回其中任何一个的异常就足够了。
我有一个任务需要聚合
CompletableFuture
列表返回的子结果,但是一旦其中任何一个失败,该任务就应该停止等待。我知道子任务将继续运行,这没关系。
我发现 Waiting on a list of Future 最初看起来像是一个重复的问题,但接受的答案使用
CompletionService
,它需要 Callable
或 Runnable
作为输入。我正在寻找一种将已运行的 CompletionStage
作为输入的解决方案。
这个问题其实和 Replace Futures.successfulAsList with Java 8 CompletableFuture 非常相似?
虽然问题不完全相同,但相同的答案(来自我自己)应该可以满足您的需求。
您可以使用
allOf()
的组合来实现此功能,并将每个输入 future 与 exceptionally()
链接起来,这将使 allOf()
返回的 future 立即失败:
CompletableFuture<String> a = …, b = …, c = …;
CompletableFuture<Void> allWithFailFast = CompletableFuture.allOf(a, b, c);
Stream.of(a, b, c)
.forEach(f -> f.exceptionally(e -> {
allWithFailFast.completeExceptionally(e);
return null;
}));
我相信这可以完成工作:
/**
* @param arrayOfFutures an array of futures to wait on
* @return a {@code CompletableFuture} that completes successfully once all elements have completed successfully, or completes
* exceptionally after any of the elements has done the same
* <br>{@code @throws NullPointerException} if {@code arrayOfFutures} is null
*/
public CompletableFuture<Void> waitForAllButAbortOnFirstException(CompletableFuture<?>... arrayOfFutures)
{
if (arrayOfFutures == null)
return CompletableFuture.failedFuture(new NullPointerException("arrayOfFutures may not be null"));
if (arrayOfFutures.length == 0)
return CompletableFuture.completedFuture(null);
return CompletableFuture.anyOf(arrayOfFutures).
thenApply(unused ->
{
// Remove any futures that completed normally and try again
return Arrays.stream(arrayOfFutures).
filter(element -> !element.isDone() || element.isCompletedExceptionally()).
toArray(CompletableFuture[]::new);
}).
thenCompose(remainingFutures -> waitForAllButAbortOnFirstException(remainingFutures));
}
根据@Gili的回答,我编写了我正在使用的这段代码 - 我认为它基本相同,但易于阅读,并且可能在非等待部分花费更少的时间:
/**
* Wait for all of the futures to complete successfully unless at least one of
* them has failed (completed with an exception) - then complete
* immediately with the first exception received.
* @param futures list of completable futures to wait on
* @return a future that will complete successfully as soon as all submitted
* futures has completed successfully, or will complete exceptionally as soon
* as one of the submitted futures has completed exceptionally.
*/
public static CompletableFuture<Void> allOfUnlessFailed(
CompletableFuture<?> ...futures) {
ConcurrentLinkedQueue<CompletableFuture<?>> queue =
new ConcurrentLinkedQueue<>();
for (CompletableFuture<?> f : futures) {
queue.add(f);
f.thenRun(() -> queue.remove(f));
}
return allOfUnlessFailed(queue);
}
private static CompletableFuture<Void> allOfUnlessFailed(
ConcurrentLinkedQueue<CompletableFuture<?>> queue) {
CompletableFuture<?>[] rest = queue.toArray(new CompletableFuture[] {});
if (rest.length == 0)
return CompletableFuture.completedFuture(null);
return CompletableFuture.anyOf(rest)
.thenCompose(__ -> allOfUnlessFailed(queue));
}