在Scala中,如果关键的Future已经完成,如何丢弃其他Future?

问题描述 投票:1回答:1

假设我有三个远程调用来构建我的页面。其中一个(X)对页面来说是关键的,另外两个(A,B)只是用来增强体验的。

因为 criticalFutureX 太重要了,不能受 futureAfutureB所以我希望所有远程调用的总体延迟不超过X。

这意味着,在 criticalFutureX 完成后,我想抛弃 futureAfutureB.

val criticalFutureX = ...
val futureA = ...
val futureB = ...

// the overall latency of this for-comprehension depends on the longest among X, A and B
for {
  x <- criticalFutureX
  a <- futureA
  b <- futureB
} ...

在上面的例子中,尽管它们是并行执行的,但总体延迟取决于X、A和B中最长的一个,这不是我想要的。

Latencies:
X: |----------|
A: |---------------|
B: |---|

O: |---------------| (overall latency)

第一完成的 但是不能明确的说 "在完成criticalFutureX的情况下"。

有没有类似下面的东西?

val criticalFutureX = ...
val futureA = ...
val futureB = ...

for {
  x <- criticalFutureX
  a <- futureA // discard when criticalFutureX finished
  b <- futureB // discard when criticalFutureX finished
} ...

X: |----------|
A: |-----------... discarded
B: |---|

O: |----------| (overall latency)
scala asynchronous future
1个回答
4
投票

你可以通过一个承诺来实现


  def completeOnMain[A, B](main: Future[A], secondary: Future[B]) = {
    val promise = Promise[Option[B]]()
    main.onComplete {
      case Failure(_) =>
      case Success(_) => promise.trySuccess(None)
    }
    secondary.onComplete {
      case Failure(exception) => promise.tryFailure(exception)
      case Success(value)     => promise.trySuccess(Option(value))
    }
    promise.future
  }

一些测试代码

  private def runFor(first: Int, second: Int) = {

    def run(millis: Int) = Future {
      Thread.sleep(millis);
      millis
    }

    val start = System.currentTimeMillis()
    val combined = for {
      _ <- Future.unit
      f1 = run(first)
      f2 = completeOnMain(f1, run(second))
      r1 <- f1
      r2 <- f2
    } yield (r1, r2)

    val result = Await.result(combined, 10.seconds)
    println(s"It took: ${System.currentTimeMillis() - start}: $result")
  }

  runFor(3000, 4000)
  runFor(3000, 1000)

产量

It took: 3131: (3000,None)
It took: 3001: (3000,Some(1000))

0
投票

这种任务很难用Scala标准库Futures高效、可靠、安全地实现。没有办法中断一个 Future 还没有完成,这意味着即使你选择忽略它的结果,它仍然会继续运行,浪费内存和CPU时间。而且即使有一种方法可以中断一个正在运行的 Future但是,没有办法确保被分配的资源(网络连接、打开的文件等)会被正确释放。

我想指出的是,Ivan Stanislavciuc给出的实现有一个bug:假如 main 未来失败,那么承诺将永远无法完成,这不太可能是你想要的。

因此,我强烈建议你研究一下现代的并发效果系统,比如ZIO或猫的效果。这些不仅更安全、更快速,而且更容易。这里有一个用ZIO实现的,没有这个bug。

import zio.{Exit, Task}
import Function.tupled

def completeOnMain[A, B](
  main: Task[A], secondary: Task[B]): Task[(A, Exit[Throwable, B])] =
  (main.forkManaged zip secondary.forkManaged).use {
    tupled(_.join zip _.interrupt)
  }

Exit 是一个类型,描述了如何 secondary 任务结束,即成功返回一个 B 或因为一个错误(类型为 Throwable)或由于中断。

请注意,这个函数可以被赋予一个更复杂的签名,告诉你更多关于发生了什么,但我想在这里保持简单。

© www.soinside.com 2019 - 2024. All rights reserved.