假设我有三个远程调用来构建我的页面。其中一个(X)对页面来说是关键的,另外两个(A,B)只是用来增强体验的。
因为 criticalFutureX
太重要了,不能受 futureA
和 futureB
所以我希望所有远程调用的总体延迟不超过X。
这意味着,在 criticalFutureX
完成后,我想抛弃 futureA
和 futureB
.
val criticalFutureX = ...
val futureA = ...
val futureB = ...
// the overall latency of this for-comprehension depends on the longest among X, A and B
for {
x <- criticalFutureX
a <- futureA
b <- futureB
} ...
在上面的例子中,尽管它们是并行执行的,但总体延迟取决于X、A和B中最长的一个,这不是我想要的。
Latencies:
X: |----------|
A: |---------------|
B: |---|
O: |---------------| (overall latency)
有 第一完成的 但是不能明确的说 "在完成criticalFutureX的情况下"。
有没有类似下面的东西?
val criticalFutureX = ...
val futureA = ...
val futureB = ...
for {
x <- criticalFutureX
a <- futureA // discard when criticalFutureX finished
b <- futureB // discard when criticalFutureX finished
} ...
X: |----------|
A: |-----------... discarded
B: |---|
O: |----------| (overall latency)
你可以通过一个承诺来实现
def completeOnMain[A, B](main: Future[A], secondary: Future[B]) = {
val promise = Promise[Option[B]]()
main.onComplete {
case Failure(_) =>
case Success(_) => promise.trySuccess(None)
}
secondary.onComplete {
case Failure(exception) => promise.tryFailure(exception)
case Success(value) => promise.trySuccess(Option(value))
}
promise.future
}
一些测试代码
private def runFor(first: Int, second: Int) = {
def run(millis: Int) = Future {
Thread.sleep(millis);
millis
}
val start = System.currentTimeMillis()
val combined = for {
_ <- Future.unit
f1 = run(first)
f2 = completeOnMain(f1, run(second))
r1 <- f1
r2 <- f2
} yield (r1, r2)
val result = Await.result(combined, 10.seconds)
println(s"It took: ${System.currentTimeMillis() - start}: $result")
}
runFor(3000, 4000)
runFor(3000, 1000)
产量
It took: 3131: (3000,None)
It took: 3001: (3000,Some(1000))
这种任务很难用Scala标准库Futures高效、可靠、安全地实现。没有办法中断一个 Future
还没有完成,这意味着即使你选择忽略它的结果,它仍然会继续运行,浪费内存和CPU时间。而且即使有一种方法可以中断一个正在运行的 Future
但是,没有办法确保被分配的资源(网络连接、打开的文件等)会被正确释放。
我想指出的是,Ivan Stanislavciuc给出的实现有一个bug:假如 main
未来失败,那么承诺将永远无法完成,这不太可能是你想要的。
因此,我强烈建议你研究一下现代的并发效果系统,比如ZIO或猫的效果。这些不仅更安全、更快速,而且更容易。这里有一个用ZIO实现的,没有这个bug。
import zio.{Exit, Task}
import Function.tupled
def completeOnMain[A, B](
main: Task[A], secondary: Task[B]): Task[(A, Exit[Throwable, B])] =
(main.forkManaged zip secondary.forkManaged).use {
tupled(_.join zip _.interrupt)
}
Exit
是一个类型,描述了如何 secondary
任务结束,即成功返回一个 B
或因为一个错误(类型为 Throwable
)或由于中断。
请注意,这个函数可以被赋予一个更复杂的签名,告诉你更多关于发生了什么,但我想在这里保持简单。