// I have hundreds of tasks converting inputs into outputs, which should be persisted.
case class Op(i: Int)
case class Output(i: Int)
val inputs: Seq[Op] = ??? // Number of inputs is huge
def executeLongRunning(op: Op): Output = {
Thread.sleep(Random.nextInt(1000) + 1000) // I cannot predict which tasks will finish first
println("<==", op)
Output(op.i)
}
def executeSingleThreadedSave(outputs: Seq[Output]): Unit = {
synchronized { // Problem is, persisting output is itself a long-running process,
// which cannot be parallelized (internally uses blocking queue).
Thread.sleep(5000) // persist time is independent of outputs.size
println("==>", outputs) // Order of persisted records does not matter
}
}
// TODO: this needs to be implemented
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = ???
val eventualOutputs: Seq[Future[Output]] = inputs.map((input: Op) => Future(executeLongRunning(input)))
magicSaver(eventualOutputs, executeSingleThreadedSave)
我可以实施 magicSaver
是。
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
saver(Await.result(Future.sequence(eventualOutputs), Duration.Inf))
}
但这有一个很大的缺点 就是我们要等所有的输入都被处理完之后 才开始持久化输出,这从容错的角度来看并不理想。
另一个实现是。
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
eventualOutputs.foreach(_.onSuccess { case output: Output => saver(Seq(output)) })
}
但这会把执行时间吹到input.size * 5秒(因为同步的性质,这是不可接受的。
我希望有一种方法可以将已经完成的期货批处理在一起。当这种期货的数量达到一定的权衡规模(比如100)时,但我不确定。怎样才能做到干净利落 而不需要明确地编写投票逻辑。
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
def waitFor100CompletedFutures(eventualOutputs: Seq[Future[Output]]): (Seq[Output], Seq[Future[Output]]) = {
var completedCount: Int = 0
do {
completedCount = eventualOutputs.count(_.isCompleted)
Thread.sleep(100)
} while ((completedCount < 100) && (completedCount != eventualOutputs.size))
val (completed: Seq[Future[Output]], remaining: Seq[Future[Output]]) = eventualOutputs.partition(_.isCompleted)
(Await.result(Future.sequence(completed), Duration.Inf), remaining)
}
var completed: Seq[Output] = null
var remaining: Seq[Future[Output]] = eventualOutputs
do {
(completed: Seq[Output], remaining: Seq[Future[Output]]) = waitFor100CompletedFutures(remaining)
saver(completed)
} while (remaining.nonEmpty)
}
有什么优雅的解决方案吗?
我把我的解决方案贴在这里,供参考。它的好处是完全避免了批处理,而且调用了 processOutput
一旦有了输出,这是我所说的约束条件下的最佳情况。
def magicSaver[T, R](eventualOutputs: Seq[Future[T]],
processOutput: Seq[T] => R)(implicit ec: ExecutionContext): Seq[R] = {
logInfo(s"Size of outputs to save: ${eventualOutputs.size}")
var remaining: Seq[Future[T]] = eventualOutputs
val processorOutput: mutable.ListBuffer[R] = new mutable.ListBuffer[R]
do {
val (currentCompleted: Seq[Future[T]], currentRemaining: Seq[Future[T]]) = remaining.partition(_.isCompleted)
if (remaining.size == currentRemaining.size) {
Thread.sleep(100)
} else {
logInfo(s"Got ${currentCompleted.size} completed records, remaining ${currentRemaining.size}")
val completed = currentCompleted.map(Await.result(_, Duration.Zero))
processorOutput.append(processOutput(completed))
}
remaining = currentRemaining
} while (remaining.nonEmpty)
processorOutput
}