Java8 CompletableFuture.get(long timeout, TimeUnit unit)没有及时返回

问题描述 投票:0回答:1

我假设所有任务都会在 1000 毫秒后超时,但实际上并非如此。 它打印如下,这是怎么发生的。

future0
java.util.concurrent.TimeoutException
1010
future1
java.util.concurrent.TimeoutException
2014
future2
3015
future3
3015
future4
3016
future5
3016
future6
3016
future7
java.util.concurrent.TimeoutException
4020
future8
java.util.concurrent.TimeoutException
5021
future9
6018
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        int taskCount = 10;
        long ts = System.currentTimeMillis();
        List<CompletableFuture> list = new ArrayList<>();
        for (int i = 0; i < taskCount; i++) {
            CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            list.add(future);
        }
        for (int i = 0; i < taskCount; i++) {
            System.out.println("future" + i);
            try {
                list.get(i).get(1000, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                System.out.println(e);
            } finally {
                System.out.println(System.currentTimeMillis() - ts);
            }
        }
    }

我假设当我打电话

CompletableFuture.get(long timeout, TimeUnit unit)
时,我总能及时得到结果或
TimeoutException

java completable-future
1个回答
0
投票

您正在向公共

ForkJoinPool
提交任务。分叉连接池具有尝试维持的一组并行性级别。公共池的默认并行度基于可用处理器的数量。在您的情况下,并行度设置为七。然而,由于任务的实现方式,对
sleep
的调用会阻止池生成线程以维持其并行性。因此,在任何给定时间只能同时运行七个任务。简而言之,并非所有任务都同时开始。

此外,您在每个 future

顺序
上调用 get。当主线程在调用
get
时被阻塞时,已启动的任务会继续执行。

如果您在日志中包含更多信息,您可以更好地看到这一点。

Parallelism: 7
|---------|------------|----------|-----------|---------|------------|
| Task ID | Task Start | Task End | Get Start | Get End |   Result   |
|---------|------------|----------|-----------|---------|------------|
|       0 |          6 |     3008 |         7 |    1012 | timed out  |
|       1 |          7 |     3008 |      1012 |    2026 | timed out  |
|       2 |          8 |     3008 |      2026 |    3008 | successful |
|       3 |          8 |     3010 |      3008 |    3010 | successful |
|       4 |          8 |     3010 |      3010 |    3010 | successful |
|       5 |          8 |     3026 |      3010 |    3026 | successful |
|       6 |          9 |     3026 |      3026 |    3026 | successful |
|       7 |       3008 |     6009 |      3026 |    4028 | timed out  |
|       8 |       3008 |     6009 |      4028 |    5041 | timed out  |
|       9 |       3008 |     6009 |      5041 |    6009 | successful |

注意: 开始和结束时间以毫秒为单位。值与恒定的起始时间相关。

从这些时间中,您可以看到任务 0-6 都立即开始,但任务 7-9 直到任务 0-6 完成后(大约三秒后)才开始。同时,

get
调用每秒发生一次并阻塞一秒,任务 3-8 除外,其中
get
会立即返回,因为任务已经完成。


这是给出上述输出的代码。

import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Main {

  public static void main(String[] args) throws Exception {
    long originNanos = System.nanoTime();

    var handles = new ArrayList<TaskHandle>();
    for (int i = 0; i < 10; i++) {
      var metrics = new TaskMetrics();
      var future = CompletableFuture.runAsync(createTask(metrics));
      handles.add(new TaskHandle(future, metrics));
    }

    for (var handle : handles) {
      try {
        handle.metrics().getCallNanos = System.nanoTime();
        handle.future().get(1_000, TimeUnit.MILLISECONDS);
      } catch (TimeoutException ex) {
        handle.metrics().timedOut = true;
      }
      handle.metrics().getReturnNanos = System.nanoTime();
    }

    System.out.println("Parallelism: " + ForkJoinPool.commonPool().getParallelism());
    System.out.println("|---------|------------|----------|-----------|---------|------------|");
    System.out.println("| Task ID | Task Start | Task End | Get Start | Get End |   Result   |");
    System.out.println("|---------|------------|----------|-----------|---------|------------|");

    var format = "| %7d | %10d | %8d | %9d | %7d | %-10s |%n";
    for (int i = 0; i < handles.size(); i++) {
      var handle = handles.get(i);
      // Ensure task is complete. Also ensures writing the
      // TaskMetrics fields happens-before reading them.
      handle.future().get();

      var metrics = handle.metrics();
      long taskStartMs = toMillis(originNanos, metrics.startNanos);
      long taskEndMs = toMillis(originNanos, metrics.endNanos);
      long getStartMs = toMillis(originNanos, metrics.getCallNanos);
      long getEndMs = toMillis(originNanos, metrics.getReturnNanos);
      var result = metrics.timedOut ? "timed out" : "successful";

      System.out.printf(format, i, taskStartMs, taskEndMs, getStartMs, getEndMs, result);
    }
  }

  static long toMillis(long fromNanos, long toNanos) {
    return Math.round((toNanos - fromNanos) / 1_000_000.0);
  }

  static Runnable createTask(TaskMetrics metrics) {
    return () -> {
      metrics.startNanos = System.nanoTime();
      try {
        Thread.sleep(3_000);
        metrics.endNanos = System.nanoTime();
      } catch (InterruptedException ex) {
        throw new CancellationException();
      }
    };
  }

  static class TaskMetrics {
    long startNanos;
    long endNanos;
    long getCallNanos;
    long getReturnNanos;
    boolean timedOut;
  }

  record TaskHandle(Future<?> future, TaskMetrics metrics) {}
}
© www.soinside.com 2019 - 2024. All rights reserved.