Akka流-限制整个流一次处理的元素数量

问题描述 投票:0回答:2

我目前正在开发一个必须处理源自压缩文件的元素流的应用程序。此应用程序的限制应为系统中的内存中只能有X数量的元素可以处理。我看了一下following documentation,但似乎缓冲区一旦装满,就必须删除我不想要的元素。优选地,只要达到元素的最大数量,就应该暂停源(从文件中读取),并且一旦数量再次下降,就继续。

有什么想法吗?

scala akka akka-stream
2个回答
0
投票

对于Akka Streams中的这种情况,我会使用mapAsync并注意不要使用.async(因为后者至少在没有自定义ActorMaterializer或调度程序的情况下调度任务,因此不会传播背压紧紧地)。

object AkkaStreamLimitInflight {
  implicit val actorSystem = ActorSystem("foo")
  implicit val mat = ActorMaterializer()

  def main(args: Array[String]): Unit = {
    import actorSystem.dispatcher

    val inflight = new AtomicInteger(0)

    def printWithInflight(msg: String): Unit = {
      println(s"$msg (${inflight.get} inflight)")
    }

    val source = Source.unfold(0) { state => 
      println(s"Emitting $state (${inflight.incrementAndGet()} inflight)")
      Some((state + 1, state))
    }.take(10)

    def quintuple(i: Int): (Int, Int) = {
      val quintupled = 5 * i
      printWithInflight(s"$i quintupled is $quintupled (originally $i)")
      (i, quintupled)
    }

    def minusOne(tup: (Int, Int)): (Int, Int) = {
      val (original, i) = tup
      val minus1 = i - 1
      printWithInflight(s"$i minus one is $minus1 (originally $original)")
      (original, minus1)
    }

    def double(tup: (Int, Int)): (Int, Int) = {
      val (original, i) = tup
      val doubled = 2 * i
      printWithInflight(s"$i doubled is $doubled (originally $original)")
      (original, doubled)
    }

    val toUnit = Flow[(Int, Int)]
      .map { case (original, i) =>
        println(s"Done with $i (originally $original) 
(${inflight.decrementAndGet()} inflight)")
        ()
      }

    val fut: Future[Done] = source
      .mapAsync(1) { i => Future { quintuple(i) }}
      .mapAsync(1) { tup => Future { minusOne(tup) }.map(double) }
      .via(toUnit)
      .runWith(Sink.ignore)


    fut.onComplete { _ => actorSystem.terminate() }
  }
}

在这种情况下,飞行计数将永远不会超过4(由于运算符融合,第一个mapAsync之前为一个,第一个mapAsync中为一个,第二个mapAsync中为一个,第二个mapAsync之后为一个)。 C0])。如果要限制特定阶段的飞行中元素的数量,这是要走的路。

但是,如果您只想限制流中正在进行的工作,将业务逻辑移动到单个Future中,并在mapAsync中生成有限数量的期货,那么您只需旋转一个旋钮即可:] >

val fut: Future[Done] = source
  .mapAsync(5) { i =>
    Future { quintuple(i) }
      .map(minusOne)
      .map(double)
  }
  .via(toUnit)
  .runWith(Sink.ignore)

0
投票

您所说的被称为溢出策略。在documentation you link to中,第一个示例显示了所需的溢出策略:OverflowStrategy.backpressure

© www.soinside.com 2019 - 2024. All rights reserved.