如何子类化CompletableFuture?

问题描述 投票:0回答:5

我想对 CompletableFuture 进行子类化以覆盖默认的

Executor
。也就是说,如果用户在未指定
Executor
的情况下调用方法,我希望使用我自己的
Executor
,而不是
CompletableFuture
通常使用的方法。

Javadoc 暗示了子类化的可能性:

所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一种方法的行为不会受到子类中其他方法的重写的影响。

如果底层实现依赖于像

CompletableFuture.supplyAsync()
这样的包私有方法,我该如何在子类中实现像
internalComplete()
这样的静态方法?

如何子类化 CompletableFuture?


我正在努力做什么...

我的用户代码需要使用同一个执行器异步执行多个任务。例如:

CompletableFuture.supplyAsync(..., executor).thenApplyAsync(..., executor).thenApplyAsync(..., executor)
。我希望自定义
CompletableFuture
实现在所有后续调用中使用第一个执行器。

java java-8 future
5个回答
8
投票

从 Java 9 开始,对此任务有内置支持:

public class MyCompletableFuture<T> extends CompletableFuture<T> {

    public static <T> CompletableFuture<T> supplyAsync(Supplier<T> s, Executor e) {
        Objects.requireNonNull(s);
        Objects.requireNonNull(e);
        return new MyCompletableFuture<T>(e).completeAsync(s);
    }

    private final Executor executor;

    public MyCompletableFuture(Executor executor) {
        this.executor = executor;
    }

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() {
        return new MyCompletableFuture<>(executor);
    }

    @Override
    public Executor defaultExecutor() {
        return executor;
    }
}
  • 通过重写

    newIncompleteFuture()
    ,我们可以确保所有
    then…
    when…
    等方法都将返回
    MyCompletableFuture
    的实例,而无需重写它们中的每一个。

  • defaultExecutor()
    方法指定所有不带
    Executor
    参数的异步方法使用的执行器。

  • 方便的是,非

    static
    方法
    completeAsync
    允许我们使用未来实现的实例来获得
    supplyAsync
    行为。上面的例子提供了一种
    static
    方法
    supplyAsync
    ,可以与原始方法类似地使用,即
    MyCompletableFuture.supplyAsync(supplier, executor)


在 Java 8 中,您必须使用装饰模式重写要更改其行为的所有方法,这仍然不需要触及

CompletableFuture
的任何内部工作原理。

import java.util.concurrent.*;
import java.util.function.*;

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    public static <T> CompletableFuture<T> supplyAsync(Supplier<T> s, Executor e){
        return my(CompletableFuture.supplyAsync(s, e), e);
    }
    private static <T> CompletableFuture<T> my(CompletableFuture<T> f,Executor e){
        MyCompletableFuture<T> my=new MyCompletableFuture<>(f, e);
        f.whenComplete((v,t)-> {
            if(t!=null) my.completeExceptionally(t); else my.complete(v);
        });
        return my;
    }
    private final CompletableFuture<T> baseFuture;
    private final Executor executor;

    MyCompletableFuture(CompletableFuture<T> base, Executor e) {
        baseFuture=base;
        executor=e;
    }
    private <T> CompletableFuture<T> my(CompletableFuture<T> base) {
        return my(base, executor);
    }
    @Override
    public CompletableFuture<Void> acceptEitherAsync(
            CompletionStage<? extends T> other, Consumer<? super T> action) {
        return my(baseFuture.acceptEitherAsync(other, action, executor));
    }
    @Override
    public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return my(baseFuture.applyToEitherAsync(other, fn, executor));
    }
    @Override
    public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn) {
        return my(baseFuture.handleAsync(fn, executor));
    }
    @Override
    public CompletableFuture<Void> runAfterBothAsync(
            CompletionStage<?> other, Runnable action) {
        return my(baseFuture.runAfterBothAsync(other, action, executor));
    }
    @Override
    public CompletableFuture<Void> runAfterEitherAsync(
            CompletionStage<?> other, Runnable action) {
        return my(baseFuture.runAfterEitherAsync(other, action, executor));
    }
    @Override
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return my(baseFuture.thenAcceptAsync(action, executor));
    }
    @Override
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
        return my(baseFuture.thenAcceptBothAsync(other, action, executor));
    }
    @Override
    public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T, ? extends U> fn) {
        return my(baseFuture.thenApplyAsync(fn, executor));
    }
    @Override
    public <U, V> CompletableFuture<V> thenCombineAsync(
            CompletionStage<? extends U> other,
            BiFunction<? super T, ? super U, ? extends V> fn) {
        return my(baseFuture.thenCombineAsync(other, fn, executor));
    }
    @Override
    public <U> CompletableFuture<U> thenComposeAsync(
            Function<? super T, ? extends CompletionStage<U>> fn) {
        return my(baseFuture.thenComposeAsync(fn, executor));
    }
    @Override
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return my(baseFuture.thenRunAsync(action, executor));
    }
    @Override
    public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action) {
        return my(baseFuture.whenCompleteAsync(action, executor));
    }
}

这是一个简单的测试用例,表明它按预期工作:

ScheduledExecutorService ses=Executors.newSingleThreadScheduledExecutor();
Executor e=r -> {
    System.out.println("adding delay");
    ses.schedule(r, 2, TimeUnit.SECONDS);
};
MyCompletableFuture.supplyAsync(()->"initial value", e)
  .thenApplyAsync(String::hashCode)
  .thenApplyAsync(Integer::toOctalString)
  .thenAcceptAsync(System.out::println);

1
投票

回答最初的问题——如何子类化 CompletableFuture?

可能从干净的 CompletionStage 接口实现(而不是从具体的 CompletableFuture 类)开始是一个更好的选择。请看这里:

https://github.com/lukas-krecan/completion-stage


0
投票

据我所知,您需要自己的

internalComplete()
实现,它在 null 或异常的情况下实例化您自己的
AltResult
的等效项。并且您的子类
supplyAsync()
应该返回新
CompletableFuture
子类的新实例。


0
投票

您的目标是限制访问网络服务的速率。您的解决方案是使用会引入延迟的特殊执行器。这不是最好的解决方案,因为链中的某些功能可能会花费一些时间进行其他活动,或者只是在操作系统级别的处理器队列中等待太长时间,因此在访问 Web 服务之前不需要延迟,但会无论如何都会被执行者拖延。延迟应该嵌入到实际执行 Web 服务请求的模块中。它应该记住上一个请求的时间,如果它太小,只需在通过套接字发送请求之前调用 Thread.sleep() 即可。这样你就不需要任何 CompletableFuture 并可以直接调用函数,如 f3(f1(f0)))。至少它更具可读性。


-3
投票

异步编程被广泛误解。理论方法是将异步程序表示为 Petri 网,其中主要组件是标记、位置和转换。从编程的角度来看,转换应该进一步分为触发规则和操作。逻辑链显得太长,程序员更喜欢过于简单的“事件反应”方案,这在任何复杂的情况下都不起作用。 CompletableFuture 的限制更加严格,其方案为“动作 - 原始转换 - 动作”。

在您的情况下,每个阶段都必须从两个来源获取:前一阶段的结果和允许访问 Web 服务的资源令牌。 CompletableFuture 根本不适合这种情况。我建议首先为您的案例绘制 Petri 网,然后使用具有同步方法的类从头开始实现基本的基础设施。或者尝试我的异步库df4j2,它允许在多个位置构建过渡。

© www.soinside.com 2019 - 2024. All rights reserved.