CompletableFuture无法像某些Java 8流代码所接受的那样工作

问题描述 投票:0回答:1

我正在关注this SO answer

以下代码段可以正常工作。使用所有线程

摘要1

scrolledPage.stream()
    .filter(this::isUserDoesntHaveId)
    .map(this::processSingle)                                          // processSingle method return supplier
    .map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
    .collect(Collectors.toList())                                      // Collect those as list
    .stream()                                                          // then again making another stream out of that.
    .map(CompletableFuture::join)
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

我对这段代码感到困惑

    .map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
    .collect(Collectors.toList())                                      // Collect those as list
    .stream()                                                          // then again making another stream out of that.
    .map(CompletableFuture::join)

为什么我需要收集然后再次进行收集。我已经测试过了。

摘要2

scrolledPage.stream()
    .filter(this::isUserDoesntHaveId)
    .map(this::processSingle)                                           // processSingle method return supplier
    .map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
    .map(CompletableFuture::join)               // this code is similar without the collecting part.
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

有了代码,我已经看到最后一个代码片段仅在线程池中使用一个线程。但是第一个使用每个线程。这两个代码段之间有什么区别。

java java-8 completable-future
1个回答
0
投票

第一个代码段可以分为两部分。在第一部分中,您将使用CompletableFuture将所有任务提交给异步进程,并在它立即返回Future对象时,流将处理并收集列表中的所有将来的项。

List<Future> futures = scrolledPage.stream()
    ...
    .map(task -> CompletableFuture.supplyAsync(task, executorService))
    .collect(Collectors.toList())

以及后面的部分,当您使用join时,流处理将等待结果,但是届时所有线程都将在工作。因此,它能够利用所有线程。

    futures.stream()
       .map(CompletableFuture::join)
       ...
       .collect(Collectors.toList());

流执行惰性计算意味着仅在需要时才处理元素。 (要求由终端操作触发,在这种情况下为collect。)>

在第二个片段中,map(CompletableFuture::join)使线程等待结果,然后再处理流中的下一个元素

.stream()
...
.map(task -> CompletableFuture.supplyAsync(task, executorService))
.map(CompletableFuture::join)
...
.collect(Collectors.toList())

因此,只有当第一个任务完成时,流中的任务(下一个元素)才会被处理。这将使您的任务一个接一个地依次执行。

© www.soinside.com 2019 - 2024. All rights reserved.