我正在玩Java 8可完成的期货。我有以下代码:
CountDownLatch waitLatch = new CountDownLatch(1);
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("Wait");
waitLatch.await(); //cancel should interrupt
System.out.println("Done");
} catch (InterruptedException e) {
System.out.println("Interrupted");
throw new RuntimeException(e);
}
});
sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");
assertTrue(future.isCancelled());
assertTrue(future.isDone());
sleep(100); //give it some time to finish
使用runAsync我计划执行等待锁存器的代码。接下来我取消了未来,期望被抛入中断的异常。但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException。使用ExecutorService的等效代码按预期工作。它是CompletableFuture中的错误还是我的示例中的错误?
显然,这是故意的。 CompletableFuture::cancel方法的Javadoc声明:
[参数:] mayInterruptIfRunning - 此值在此实现中无效,因为中断不用于控制处理。
有趣的是,方法ForkJoinTask::cancel对参数mayInterruptIfRunning使用几乎相同的措辞。
我猜这个问题:
CompletableFuture应该创建一个新的CompletionStage,而不是阻塞,而cpu绑定任务是fork-join模型的先决条件。因此,使用其中任何一个中断都会失败他们的目的。而另一方面,它可能会增加复杂性,如果按预期使用则不需要。
当您调用CompletableFuture#cancel
时,您只会停止链的下游部分。上游部分,i。即最终会调用complete(...)
或completeExceptionally(...)
的东西,没有任何信号表明结果不再需要。
什么是'上游'和'下游'的东西?
我们考虑以下代码:
CompletableFuture
.supplyAsync(() -> "hello") //1
.thenApply(s -> s + " world!") //2
.thenAccept(s -> System.out.println(s)); //3
在这里,数据从上到下流动 - 从供应商创建,到功能修改,再到println
消费。上述特定步骤的部分称为上游部分,下部部分称为下游部分。 E. g。步骤1和2是步骤3的上游。
这是幕后发生的事情。这不是精确的,而是一个方便的思维模型。
ForkJoinPool
内)。complete(...)
传递到下一个CompletableFuture
下游。CompletableFuture
调用下一步 - 一个函数(步骤2),它接收前一步结果并返回一些将进一步传递给下游CompletableFuture
的complete(...)
。CompletableFuture
调用消费者System.out.println(s)
。消费者完成后,下游的CompletableFuture
将获得它的价值,(Void) null
正如我们所看到的,这个链中的每个CompletableFuture
必须知道下游有谁在等待将值传递给他们的complete(...)
(或completeExceptionally(...)
)。但是CompletableFuture
不必知道它的上游(或上游 - 可能有几个)。
因此,在步骤3中调用cancel()
不会中止步骤1和2,因为从步骤3到步骤2没有链接。
假设你正在使用CompletableFuture
,那么你的步数就足够小了,如果执行几个额外的步骤就没有坏处。
如果要将取消传播到上游,您有两种选择:
CompletableFuture
(命名为cancelled
),在每一步之后检查(类似于step.applyToEither(cancelled, Function.identity())
)您需要CompletionStage的替代实现来完成真正的线程中断。我刚刚发布了一个小型图书馆,正是为了这个目的 - https://github.com/vsilaev/tascalate-concurrent
CancellationException是内部ForkJoin取消例程的一部分。检索未来结果时会出现异常:
try { future.get(); }
catch (Exception e){
System.out.println(e.toString());
}
花一点时间在调试器中看到这个。 JavaDoc对于正在发生的事情或您应该期待的事情并不清楚。