我正在尝试以每个工作者2秒钟的超时停止许多Worker
实例,我希望其中一些实例会失败。我想实现以下目标,但我知道这是不好的方法。我不能使用CompletableFuture.allOf(),因为它会在第一次失败时停止。我也从未使用过CompletableFutures。我尝试使用Executors.newFixedThreadPool(3).invokeAll()没有成功。
public interface Worker {
public String workerId();
public CompletableFuture<Worker> stop();
}
List<Worker> workers;
public stopAll() {
workers.stream()
.parallel()
.map(worker ->
try {
worker.stop().get(2, TimeUnit.SECONDS)
} catch(InterruptedException | java.util.concurrent.ExecutionException | TimeoutException e){
log.error("Worker {} failed to stop", worker.workerId())
}
}
我的目标是:
stopAll()方法大约需要2秒钟才能完成
这次没有停止的所有工作人员都由ID记录
有人有任何建议吗?感谢您的帮助。
Java没有一种很棒的方法来等待一堆超时(我知道):
public class TryWaitForCF {
private void run() throws Exception {
List<Worker> workers = new ArrayList<>();
for( int i=0; i<10; i++){
workers.add(new Worker("Worker="+i));
}
List<CompletableFuture<Worker>> waitList = new ArrayList<>();
workers.forEach(worker -> waitList.add(worker.stop()));
long endTime = System.currentTimeMillis() + 2000;
for( CompletableFuture<Worker> cf : waitList ){
long timeout = endTime - System.currentTimeMillis();
if( timeout < 0 ){
timeout = 0;
}
System.out.println("Get result waiting at most: " + timeout + " ms");
try {
Worker result = cf.get(timeout, TimeUnit.MILLISECONDS);
System.out.println(result.name + ": finished" );
}
catch( Exception e ){
System.out.println("Failed to get result: " + e.getMessage());
}
}
}
public static void main(String[] args) throws Exception {
new TryWaitForCF().run();
}
static class Worker {
private String name;
private long timeToComplete;
public Worker(String name){
this.name = name;
this.timeToComplete = (long) (Math.random() * (3000 - 1000 + 1) + 1000);
System.out.println(name + ": timeToComplete=" + timeToComplete);
}
public CompletableFuture<Worker> stop() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(timeToComplete);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return this;
});
}
}
}
结果:
Worker=0: timeToComplete=1018
Worker=1: timeToComplete=1866
Worker=2: timeToComplete=1894
Worker=3: timeToComplete=2041
Worker=4: timeToComplete=1124
Worker=5: timeToComplete=1613
Worker=6: timeToComplete=2445
Worker=7: timeToComplete=2188
Worker=8: timeToComplete=2129
Worker=9: timeToComplete=2174
Get result waiting at most: 2000 ms
Worker=0: finished
Get result waiting at most: 980 ms
Worker=1: finished
Get result waiting at most: 130 ms
Worker=2: finished
Get result waiting at most: 100 ms
Failed to get result: null
Get result waiting at most: 0 ms
Worker=4: finished
Get result waiting at most: 0 ms
Worker=5: finished
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null
不过,您可能想更改设计。
没有实际问题,正如您的主张,“ 我无法使用CompletableFuture.allOf(),因为它会在第一次失败时停止”是错误的。如果至少一个输入期货已异常完成,则allOf
返回的期货将异常完成,但仍将在所有期货均已完成后才完成。可以很容易地证明:
CompletableFuture<?> f1 = new CompletableFuture<>();
f1.completeExceptionally(new Throwable("fail immediately"));
CompletableFuture<?> f2
= CompletableFuture.runAsync(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)));
CompletableFuture<?> all = CompletableFuture.allOf(f1, f2);
long t0 = System.nanoTime();
try {
all.join();
} finally {
System.err.println("Completed: "+f1.isDone()+", "+f2.isDone());
System.err.printf("%.2fs%n", (System.nanoTime()-t0)*1e-9);
}
Completed: true, true
2,00s
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.Throwable: fail immediately
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1284)
at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1632)
at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1618)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.Throwable: fail immediately
at Demo.main(Demo.java:16)
因此,即使某些作业失败,也可以使用allOf
检查所有作业的完成状态:
ExecutorService e = Executors.newFixedThreadPool(20);
Random r = ThreadLocalRandom.current();
CompletableFuture<?>[] workerJobs = IntStream.range(0, 20)
.mapToObj(i -> {
long time = TimeUnit.MILLISECONDS.toNanos(r.nextInt(4000));
boolean fail = r.nextBoolean();
return CompletableFuture.runAsync(() -> {
LockSupport.parkNanos(time);
if(fail) throw new RuntimeException();
}, e);
})
.toArray(CompletableFuture<?>[]::new);
e.shutdown();
try {
CompletableFuture.allOf(workerJobs).get(2, TimeUnit.SECONDS);
System.out.println("All completed within 2 seconds or less without failures");
}
catch(InterruptedException ex) {
throw new AssertionError(ex);
}
catch(ExecutionException ex) {
System.out.println("All completed within 2 seconds or less, at least one failed");
}
catch(TimeoutException ex) {
System.out.println("At least one did not complete within 2 seconds");
}
for(CompletableFuture<?> f: workerJobs) {
System.out.println(f.isDone()? "completed"
+(f.isCompletedExceptionally()? " exceptionally": ""): "not completed");
}