Scala: 按(近似)完成时间排序,分批处理期货。

问题描述 投票:0回答:1
// I have hundreds of tasks converting inputs into outputs, which should be persisted.

case class Op(i: Int)
case class Output(i: Int)
val inputs: Seq[Op] = ??? // Number of inputs is huge

def executeLongRunning(op: Op): Output = {
  Thread.sleep(Random.nextInt(1000) + 1000) // I cannot predict which tasks will finish first
  println("<==", op)
  Output(op.i)
}
def executeSingleThreadedSave(outputs: Seq[Output]): Unit = {
  synchronized { // Problem is, persisting output is itself a long-running process, 
                 // which cannot be parallelized (internally uses blocking queue).
    Thread.sleep(5000) // persist time is independent of outputs.size
    println("==>", outputs) // Order of persisted records does not matter
  }
}

// TODO: this needs to be implemented
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = ??? 

val eventualOutputs: Seq[Future[Output]] = inputs.map((input: Op) => Future(executeLongRunning(input)))

magicSaver(eventualOutputs, executeSingleThreadedSave)

我可以实施 magicSaver 是。

def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
  saver(Await.result(Future.sequence(eventualOutputs), Duration.Inf))
}

但这有一个很大的缺点 就是我们要等所有的输入都被处理完之后 才开始持久化输出,这从容错的角度来看并不理想。

另一个实现是。

def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
  eventualOutputs.foreach(_.onSuccess { case output: Output => saver(Seq(output)) })
}

但这会把执行时间吹到input.size * 5秒(因为同步的性质,这是不可接受的。

我希望有一种方法可以将已经完成的期货批处理在一起。当这种期货的数量达到一定的权衡规模(比如100)时,但我不确定。怎样才能做到干净利落 而不需要明确地编写投票逻辑。

def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
  def waitFor100CompletedFutures(eventualOutputs: Seq[Future[Output]]): (Seq[Output], Seq[Future[Output]]) = {
    var completedCount: Int = 0
    do {
      completedCount = eventualOutputs.count(_.isCompleted)
      Thread.sleep(100)
    } while ((completedCount < 100) && (completedCount != eventualOutputs.size))
    val (completed: Seq[Future[Output]], remaining: Seq[Future[Output]]) = eventualOutputs.partition(_.isCompleted)
    (Await.result(Future.sequence(completed), Duration.Inf), remaining)
  }

  var completed: Seq[Output] = null
  var remaining: Seq[Future[Output]] = eventualOutputs
  do {
    (completed: Seq[Output], remaining: Seq[Future[Output]]) = waitFor100CompletedFutures(remaining)
    saver(completed)
  } while (remaining.nonEmpty)
}

有什么优雅的解决方案吗?

scala future
1个回答
0
投票

我把我的解决方案贴在这里,供参考。它的好处是完全避免了批处理,而且调用了 processOutput 一旦有了输出,这是我所说的约束条件下的最佳情况。

def magicSaver[T, R](eventualOutputs: Seq[Future[T]], 
                     processOutput: Seq[T] => R)(implicit ec: ExecutionContext): Seq[R] = {
  logInfo(s"Size of outputs to save: ${eventualOutputs.size}")

  var remaining: Seq[Future[T]] = eventualOutputs
  val processorOutput: mutable.ListBuffer[R] = new mutable.ListBuffer[R]
  do {
    val (currentCompleted: Seq[Future[T]], currentRemaining: Seq[Future[T]]) = remaining.partition(_.isCompleted)
    if (remaining.size == currentRemaining.size) {
      Thread.sleep(100)
    } else {
      logInfo(s"Got ${currentCompleted.size} completed records, remaining ${currentRemaining.size}")
      val completed = currentCompleted.map(Await.result(_, Duration.Zero))
      processorOutput.append(processOutput(completed))
    }
    remaining = currentRemaining
  } while (remaining.nonEmpty)
  processorOutput
}
© www.soinside.com 2019 - 2024. All rights reserved.