具有超时功能的可完成的未来

问题描述 投票:1回答:2

我是新的可持续未来。我试图为一个元素列表(它们是参数)调用一个并行方法,然后将结果组合起来创建一个最终响应。我也试图设置50毫秒的超时,这样如果调用在50毫秒内没有返回,我将返回一个默认值。

到目前为止,我试过这个:

    {

     List<ItemGroup> result = Collections.synchronizedList(Lists.newArrayList());

    try {
     List<CompletableFuture> completableFutures = response.getItemGroupList().stream()
     .map(inPutItemGroup -> 
       CompletableFuture.runAsync(() -> {
           final ItemGroup itemGroup = getUpdatedItemGroup(inPutItemGroup);               //call which I am tryin to make parallel

           // this is thread safe
           if (null != itemGroup) {
                  result.add(itemGroup); //output of the call
           }
        }, executorService).acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS),inPutItemGroup))  //this line throws error     
     .collect(Collectors.toList());

// this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
                        .join();
} catch (final Throwable t) {
     final String errorMsg = String.format("Exception occurred while rexecuting parallel call");
                log.error(errorMsg, e);
                result = response.getItemGroupList(); //default value - return the input value if error
    }

    Response finalResponse = Response.builder()
                    .itemGroupList(result)
                    .build();

    }

     private <T> CompletableFuture<T> timeoutAfter(final long timeout, final TimeUnit unit) {
            CompletableFuture<T> result = new CompletableFuture<T>();

            //Threadpool with 1 thread for scheduling a future that completes after a timeout
            ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
            String message = String.format("Process timed out after %s %s", timeout, unit.name().toLowerCase());
            delayer.schedule(() -> result.completeExceptionally(new TimeoutException(message)), timeout, unit);
            return result;
     }

但我一直收到错误说:

 error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
    [javac]                             itemGroup))

incompatible types: inference variable T has incompatible bounds
    [javac]                     .collect(Collectors.toList());
    [javac]                             ^
    [javac]     equality constraints: CompletableFuture
    [javac]     lower bounds: Object
    [javac]   where T is a type-variable:

有人可以告诉我这里我做错了什么吗?如果我走向错误的方向,请纠正我。

谢谢。

java multithreading java-8 executorservice completable-future
2个回答
2
投票

代替

acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))

你需要的

applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)

编译代码。 “accept”是一个消耗值但没有返回新值的动作,“apply”是一个产生新值的动作。

但是,仍然存在逻辑错误。 timeoutAfter返回的未来将异常完成,因此依赖阶段也将异常完成,无需评估函数,因此此链接方法不适合用默认值替换异常。

更糟糕的是,解决这个问题会创造一个新的未来,这个未来将来自任何一个来源未来,但这不会影响在其中一个源期货中执行的result.add(itemGroup)行动。在您的代码中,生成的future将仅用于等待完成,但不用于评估结果。因此,当超时时间结束时,您将停止等待完成,但仍可能有后台线程修改列表。

正确的逻辑是分离获取值的步骤,该值可以在超时时被默认值取代,以及将结果(获取值或默认值)添加到结果列表的步骤。然后,您可以等待所有add操作的完成。在超时时,可能仍在进行getUpdatedItemGroup评估(无法停止执行),但是它们的结果将被忽略,因此它不会影响结果列表。

值得指出的是,为每个列表元素创建一个新的ScheduledExecutorService(使用后不会关闭,更糟糕的是),这不是正确的方法。

// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
    CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
    .map(inPutItemGroup ->
        timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
            CompletableFuture.supplyAsync(
                () -> getUpdatedItemGroup(inPutItemGroup), executorService),
            inPutItemGroup)
         .thenAccept(itemGroup -> {
            // this is thread safe, but questionable,
            // e.g. the result list order is not maintained
            if(null != itemGroup) result.add(itemGroup);
         })
    )
    .toArray(CompletableFuture<?>[]::new);

    // this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
    String errorMsg = String.format("Exception occurred while executing parallel call");
    log.error(errorMsg, e);
    endResult = response.getItemGroupList();
}
finally {
    delayer.shutdown();
}

Response finalResponse = Response.builder()
    .itemGroupList(endResult)
    .build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
    long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {

    es.schedule(() -> f.complete(value), timeout, unit);
    return f;
}

在这里,supplyAsync产生一个CompletableFuture,它将提供getUpdatedItemGroup评估的结果。 timeoutAfter调用将在超时后使用默认值计划完成,而不创建新的未来,然后,通过thenAccept链接的依赖操作将结果值添加到result列表。

请注意,synchronizedList允许从多个线程添加元素,但是从多个线程添加将导致不可预测的顺序,与源列表的顺序无关。


0
投票

acceptEither的签名如下:

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, 
    Consumer<? super T> action
) {

抛出错误的行看起来像这样:

.acceptEither(
     timeoutAfter(50, TimeUnit.MILLISECONDS),
     inPutItemGroup
)

所以你看到你试图传递一个ItemGroup作为Consumer<? super T> T被推测为Void因此你得到预期的错误:

error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
© www.soinside.com 2019 - 2024. All rights reserved.