调用列表 具有超时并收集结果

问题描述 投票:1回答:2

我正在尝试以每个工作者2秒钟的超时停止许多Worker实例,我希望其中一些实例会失败。我想实现以下目标,但我知道这是不好的方法。我不能使用CompletableFuture.allOf(),因为它会在第一次失败时停止。我也从未使用过CompletableFutures。我尝试使用Executors.newFixedThreadPool(3).invokeAll()没有成功。

public interface Worker {

public String workerId();
public CompletableFuture<Worker> stop();
}

List<Worker> workers;

public stopAll() {

   workers.stream()
   .parallel()
   .map(worker -> 
       try {
           worker.stop().get(2, TimeUnit.SECONDS)
       } catch(InterruptedException | java.util.concurrent.ExecutionException | TimeoutException e){
           log.error("Worker {} failed to stop", worker.workerId())
       }
}

我的目标是:

  1. stopAll()方法大约需要2秒钟才能完成

  2. 这次没有停止的所有工作人员都由ID记录

有人有任何建议吗?感谢您的帮助。

java asynchronous java-8 concurrency completable-future
2个回答
0
投票

Java没有一种很棒的方法来等待一堆超时(我知道):

public class TryWaitForCF {

private void run() throws Exception {

    List<Worker> workers = new ArrayList<>();
    for( int i=0; i<10; i++){
        workers.add(new Worker("Worker="+i));
    }

    List<CompletableFuture<Worker>> waitList = new ArrayList<>();
    workers.forEach(worker -> waitList.add(worker.stop()));

    long endTime = System.currentTimeMillis() + 2000;
    for( CompletableFuture<Worker> cf : waitList ){
        long timeout = endTime - System.currentTimeMillis();
        if( timeout < 0 ){
            timeout = 0;
        }
        System.out.println("Get result waiting at most: " + timeout + " ms");
        try {
            Worker result = cf.get(timeout, TimeUnit.MILLISECONDS);
            System.out.println(result.name  + ": finished" );
        }
        catch( Exception e ){
            System.out.println("Failed to get result: " + e.getMessage());
        }
    }

}

public static void main(String[] args) throws Exception {
    new TryWaitForCF().run();
}

static class Worker {
    private String name;
    private long timeToComplete;

    public Worker(String name){
        this.name = name;
        this.timeToComplete = (long) (Math.random() * (3000 - 1000 + 1) + 1000);
        System.out.println(name  + ": timeToComplete=" + timeToComplete);
    }

    public CompletableFuture<Worker> stop() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(timeToComplete);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return this;
        });
    }
}

}

结果:

Worker=0: timeToComplete=1018
Worker=1: timeToComplete=1866
Worker=2: timeToComplete=1894
Worker=3: timeToComplete=2041
Worker=4: timeToComplete=1124
Worker=5: timeToComplete=1613
Worker=6: timeToComplete=2445
Worker=7: timeToComplete=2188
Worker=8: timeToComplete=2129
Worker=9: timeToComplete=2174
Get result waiting at most: 2000 ms
Worker=0: finished
Get result waiting at most: 980 ms
Worker=1: finished
Get result waiting at most: 130 ms
Worker=2: finished
Get result waiting at most: 100 ms
Failed to get result: null
Get result waiting at most: 0 ms
Worker=4: finished
Get result waiting at most: 0 ms
Worker=5: finished
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null

不过,您可能想更改设计。


0
投票

没有实际问题,正如您的主张,“ 我无法使用CompletableFuture.allOf(),因为它会在第一次失败时停止”是错误的。如果至少一个输入期货已异常完成,则allOf返回的期货将异常完成,但仍将在所有期货均已完成后才完成。可以很容易地证明:

CompletableFuture<?> f1 = new CompletableFuture<>();
f1.completeExceptionally(new Throwable("fail immediately"));
CompletableFuture<?> f2
  = CompletableFuture.runAsync(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)));
CompletableFuture<?> all = CompletableFuture.allOf(f1, f2);

long t0 = System.nanoTime();
try {
    all.join();
} finally {
    System.err.println("Completed: "+f1.isDone()+", "+f2.isDone());
    System.err.printf("%.2fs%n", (System.nanoTime()-t0)*1e-9);
}
Completed: true, true
2,00s
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.Throwable: fail immediately
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1284)
    at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1632)
    at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1618)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.Throwable: fail immediately
    at Demo.main(Demo.java:16)

因此,即使某些作业失败,也可以使用allOf检查所有作业的完成状态:

ExecutorService e = Executors.newFixedThreadPool(20);
Random r = ThreadLocalRandom.current();
CompletableFuture<?>[] workerJobs = IntStream.range(0, 20)
    .mapToObj(i -> {
      long time = TimeUnit.MILLISECONDS.toNanos(r.nextInt(4000));
      boolean fail = r.nextBoolean();
      return CompletableFuture.runAsync(() -> {
        LockSupport.parkNanos(time);
        if(fail) throw new RuntimeException();
      }, e);
    })
    .toArray(CompletableFuture<?>[]::new);
e.shutdown();

try {
  CompletableFuture.allOf(workerJobs).get(2, TimeUnit.SECONDS);
  System.out.println("All completed within 2 seconds or less without failures");
}
catch(InterruptedException ex) {
  throw new AssertionError(ex);
}
catch(ExecutionException ex) {
  System.out.println("All completed within 2 seconds or less, at least one failed");
}
catch(TimeoutException ex) {
  System.out.println("At least one did not complete within 2 seconds");
}
for(CompletableFuture<?> f: workerJobs) {
  System.out.println(f.isDone()? "completed"
    +(f.isCompletedExceptionally()? " exceptionally": ""): "not completed");
}
© www.soinside.com 2019 - 2024. All rights reserved.