我目前正在开发一个必须处理源自压缩文件的元素流的应用程序。此应用程序的限制应为系统中的内存中只能有X数量的元素可以处理。我看了一下following documentation,但似乎缓冲区一旦装满,就必须删除我不想要的元素。优选地,只要达到元素的最大数量,就应该暂停源(从文件中读取),并且一旦数量再次下降,就继续。
有什么想法吗?
对于Akka Streams中的这种情况,我会使用mapAsync
并注意不要使用.async
(因为后者至少在没有自定义ActorMaterializer
或调度程序的情况下调度任务,因此不会传播背压紧紧地)。
object AkkaStreamLimitInflight {
implicit val actorSystem = ActorSystem("foo")
implicit val mat = ActorMaterializer()
def main(args: Array[String]): Unit = {
import actorSystem.dispatcher
val inflight = new AtomicInteger(0)
def printWithInflight(msg: String): Unit = {
println(s"$msg (${inflight.get} inflight)")
}
val source = Source.unfold(0) { state =>
println(s"Emitting $state (${inflight.incrementAndGet()} inflight)")
Some((state + 1, state))
}.take(10)
def quintuple(i: Int): (Int, Int) = {
val quintupled = 5 * i
printWithInflight(s"$i quintupled is $quintupled (originally $i)")
(i, quintupled)
}
def minusOne(tup: (Int, Int)): (Int, Int) = {
val (original, i) = tup
val minus1 = i - 1
printWithInflight(s"$i minus one is $minus1 (originally $original)")
(original, minus1)
}
def double(tup: (Int, Int)): (Int, Int) = {
val (original, i) = tup
val doubled = 2 * i
printWithInflight(s"$i doubled is $doubled (originally $original)")
(original, doubled)
}
val toUnit = Flow[(Int, Int)]
.map { case (original, i) =>
println(s"Done with $i (originally $original)
(${inflight.decrementAndGet()} inflight)")
()
}
val fut: Future[Done] = source
.mapAsync(1) { i => Future { quintuple(i) }}
.mapAsync(1) { tup => Future { minusOne(tup) }.map(double) }
.via(toUnit)
.runWith(Sink.ignore)
fut.onComplete { _ => actorSystem.terminate() }
}
}
在这种情况下,飞行计数将永远不会超过4(由于运算符融合,第一个mapAsync
之前为一个,第一个mapAsync
中为一个,第二个mapAsync
中为一个,第二个mapAsync
之后为一个)。 C0])。如果要限制特定阶段的飞行中元素的数量,这是要走的路。
但是,如果您只想限制流中正在进行的工作,将业务逻辑移动到单个Future
中,并在mapAsync
中生成有限数量的期货,那么您只需旋转一个旋钮即可:] >
val fut: Future[Done] = source
.mapAsync(5) { i =>
Future { quintuple(i) }
.map(minusOne)
.map(double)
}
.via(toUnit)
.runWith(Sink.ignore)
您所说的被称为溢出策略。在documentation you link to中,第一个示例显示了所需的溢出策略:OverflowStrategy.backpressure