Akka流中的分批/取景

问题描述 投票:0回答:1

我有一个Source[Animal],其中Animal有两种类型CatDogsource类似于dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ...我正在尝试将其转换为以下Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ...运作方式是:

  • 每批次最多3只狗,每批次最多1只猫(或者以下方法也可以:每批次最多4只动物,每批次最多1只猫)
  • 猫只能是批处理中的最后一个元素(又名成帧)
  • 而且,我无法在示例中显示速度,但是应该有一个超时之后,该批次(即使没有装满也没有猫)仍然会发出。类似于groupedWithin(4, FiniteDuration(3, SECONDS))
  • 总体顺序很重要,必须予以维护

我一直在尝试使用batchWeightedgroupedWithin,但是我还没有合适的解决方案。

[我尝试过的一个想法是将Dog权衡为1,将Cat权衡为1000,然后将batchWeightedmax weight = 1003一起使用,但这不能确保Cat始终是最后一个批处理元素...与max weight = 3相同尝试总是将Cat放在单独的组中。

[如果存在batchWithintakeWhile的混合体(没有终止),则可能已经解决了该用例。

这是一个很简单的解决方案,如果它只是在List上进行迭代,但是由于使用FlowOps受到限制,这有点挑战性]

编辑:目前,我正在执行以下操作:

  .groupedWithin(4, FiniteDuration(4, SECONDS))
  .map(frameBatch(_, Vector(), 0))
  // groupedWithin internally returns a Vector so is fast for indexed operations

  @tailrec
  private def frameBatch(
      items: Seq[Animal],
      result: Vector[Seq[Animal]],
      offset: Int
    ): Vector[Seq[Animal]] = {
    val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
    if (index == -1) {
      if (offset == 0) {
        Vector(items)
      } else {
        result :+ items.slice(offset, items.size)
      }
    } else {
      frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
    }
  }
scala akka-stream
1个回答
0
投票
这样的事情应该起作用

import scala.concurrent.duration._ def batchFrame(source: Source[Animal], interval: FiniteDuration): Source[Seq[Animal]] = { val dataSource: Source[Option[Animal]] = source.map(Some _) val timerSource: Source[Option[Animal]] = Source.tick(interval, interval, Option.empty[Animal]) val merged: Source[Option[Animal]] = dataSource.merge(timerSource) merged.statefulMapConcat { () => var dogCount = 0 var frame: List[Animal] = Nil in: Option[Animal] => { in match { case Some(cat: Cat) => val emit = (cat :: frame).reverse dogCount = 0 frame = Nil emit case Some(dog: Dog) if dogCount < 3 => frame = (dog :: frame) dogCount += 1 Nil case Some(dog: Dog) => val emit = (dog :: frame).reverse dogCount = 0 frame = Nil emit case None => val emit = frame.reverse dogCount = 0 frame = Nil emit } } }

© www.soinside.com 2019 - 2024. All rights reserved.