Mono vs CompletableFuture

问题描述 投票:7回答:1

CompletableFuture在单独的线程上执行任务(使用线程池)并提供回调函数。假设我在CompletableFuture中有一个API调用。那是一个API调用阻塞吗?线程是否会被阻塞,直到它没有得到API的响应? (我知道主线程/ tomcat线程将是非阻塞的,但是CompletableFuture任务正在执行的线程呢?)

据我所知,单声道完全没有阻挡。

如果我错了,请详细说明并纠正我。

reactive-programming project-reactor
1个回答
12
投票

CompletableFuture is Async. But is it non-blocking?

关于CompletableFuture的一个原因是它是真正的异步,它允许您从调用者线程异步运行任务,而thenXXX等API允许您在结果可用时处理结果。另一方面,CompletableFuture并不总是非阻塞。例如,当您运行以下代码时,它将在默认的ForkJoinPool上异步执行:

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
});

很明显,执行任务的Thread中的ForkJoinPool最终将被阻止,这意味着我们无法保证该呼叫将是非阻塞的。

另一方面,CompletableFuture公开了API,它允许你使它真正无阻塞。

例如,您始终可以执行以下操作:

public CompletableFuture myNonBlockingHttpCall(Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> {
        if(exception != null) {
            uncompletedFuture.completeExceptionally(exception);
            return;
        }
        uncompletedFuture.complete(result);
    })

    return uncompletedFuture;
}

正如您所看到的,CompletableFuture future的API为您提供了completecompleteExceptionally方法,可以在需要时完成您的执行而不会阻塞任何线程。

Mono vs CompletableFuture

在上一节中,我们概述了CF行为,但CompletableFuture和Mono之间的核心区别是什么?

值得一提的是,我们也可以阻止Mono。没有人阻止我们写下以下内容:

Mono.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
})

当然,一旦我们订阅了未来,调用者线程将被阻止。但我们可以通过提供额外的subscribeOn运算符来解决这个问题。尽管如此,Mono更广泛的API并不是关键的未来。

为了理解CompletableFutureMono之间的主要区别,让我们回到之前提到的myNonBlockingHttpCall方法实现。

public CompletableFuture myUpperLevelBusinessLogic() {
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    }

   return future;
}

CompletableFuture的情况下,一旦调用该方法,它将急切地执行对另一个服务/资源的HTTP调用。即使在验证一些前/后条件后我们确实不需要执行结果,它也会开始执行,并且将为此工作分配额外的CPU / DB-Connections / What-Ever-Machine-Resources。

相反,根据定义,Mono类型是懒惰的:

public Mono myNonBlockingHttpCallWithMono(Object someData) {
    return Mono.create(sink -> {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            })
    });
} 

public Mono myUpperLevelBusinessLogic() {
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    }

   return mono;
}

在这种情况下,在订阅最终的mono之前不会发生任何事情。因此,只有当Mono方法返回的myNonBlockingHttpCallWithMono将被订阅时,才会执行提供给Mono.create(Consumer)的逻辑。

我们可以走得更远。我们可以让我们的执行更加懒散。您可能知道,Mono从Reactive Streams规范扩展了Publisher。 Reactive Streams的尖叫未来是背压支持。因此,使用Mono API,我们只有在真正需要数据时才能执行,并且我们的订户已准备好使用它们:

Mono.create(sink -> {
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> {
        if(!once.get() && once.compareAndSet(false, true) {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            });
        }
    });
});

在这个例子中,我们仅在订户调用Subscription#request时才执行数据,因此它声明它已准备好接收数据。

Summary

  • CompletableFuture是异步的,可以是非阻塞的
  • CompletableFuture很渴望。你不能推迟执行。但你可以取消它们(这比没有好)
  • Mono是异步/非阻塞的,可以通过使用不同的运算符组合主Thread,轻松地在不同的Mono上执行任何调用。
  • Mono是真正的懒惰,允许通过订户存在推迟执行启动并准备消耗数据。
© www.soinside.com 2019 - 2024. All rights reserved.