我正在关注this SO answer
以下代码段可以正常工作。使用所有线程
scrolledPage.stream()
.filter(this::isUserDoesntHaveId)
.map(this::processSingle) // processSingle method return supplier
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.collect(Collectors.toList()) // Collect those as list
.stream() // then again making another stream out of that.
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
我对这段代码感到困惑
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.collect(Collectors.toList()) // Collect those as list
.stream() // then again making another stream out of that.
.map(CompletableFuture::join)
为什么我需要收集然后再次进行收集。我已经测试过了。
scrolledPage.stream()
.filter(this::isUserDoesntHaveId)
.map(this::processSingle) // processSingle method return supplier
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.map(CompletableFuture::join) // this code is similar without the collecting part.
.filter(Objects::nonNull)
.collect(Collectors.toList());
有了代码,我已经看到最后一个代码片段仅在线程池中使用一个线程。但是第一个使用每个线程。这两个代码段之间有什么区别。
第一个代码段可以分为两部分。在第一部分中,您将使用CompletableFuture
将所有任务提交给异步进程,并在它立即返回Future
对象时,流将处理并收集列表中的所有将来的项。
List<Future> futures = scrolledPage.stream()
...
.map(task -> CompletableFuture.supplyAsync(task, executorService))
.collect(Collectors.toList())
以及后面的部分,当您使用join
时,流处理将等待结果,但是届时所有线程都将在工作。因此,它能够利用所有线程。
futures.stream()
.map(CompletableFuture::join)
...
.collect(Collectors.toList());
流执行惰性计算意味着仅在需要时才处理元素。 (要求由终端操作触发,在这种情况下为collect
。)>
在第二个片段中,map(CompletableFuture::join)
使线程等待结果,然后再处理流中的下一个元素
.stream() ... .map(task -> CompletableFuture.supplyAsync(task, executorService)) .map(CompletableFuture::join) ... .collect(Collectors.toList())
因此,只有当第一个任务完成时,流中的任务(下一个元素)才会被处理。这将使您的任务一个接一个地依次执行。