我有一个Source[Animal]
,其中Animal
有两种类型Cat
和Dog
。 source
类似于dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ...
我正在尝试将其转换为以下Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ...
运作方式是:
groupedWithin(4, FiniteDuration(3, SECONDS))
我一直在尝试使用batchWeighted
和groupedWithin
,但是我还没有合适的解决方案。
[我尝试过的一个想法是将Dog
权衡为1
,将Cat
权衡为1000
,然后将batchWeighted
与max weight = 1003
一起使用,但这不能确保Cat
始终是最后一个批处理元素...与max weight = 3
相同尝试总是将Cat
放在单独的组中。
[如果存在batchWithin
和takeWhile
的混合体(没有终止),则可能已经解决了该用例。
这是一个很简单的解决方案,如果它只是在List
上进行迭代,但是由于使用FlowOps
受到限制,这有点挑战性]
编辑:目前,我正在执行以下操作:
.groupedWithin(4, FiniteDuration(4, SECONDS))
.map(frameBatch(_, Vector(), 0))
// groupedWithin internally returns a Vector so is fast for indexed operations
@tailrec
private def frameBatch(
items: Seq[Animal],
result: Vector[Seq[Animal]],
offset: Int
): Vector[Seq[Animal]] = {
val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
if (index == -1) {
if (offset == 0) {
Vector(items)
} else {
result :+ items.slice(offset, items.size)
}
} else {
frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
}
}
import scala.concurrent.duration._
def batchFrame(source: Source[Animal], interval: FiniteDuration): Source[Seq[Animal]] = {
val dataSource: Source[Option[Animal]] = source.map(Some _)
val timerSource: Source[Option[Animal]] = Source.tick(interval, interval, Option.empty[Animal])
val merged: Source[Option[Animal]] = dataSource.merge(timerSource)
merged.statefulMapConcat { () =>
var dogCount = 0
var frame: List[Animal] = Nil
in: Option[Animal] => {
in match {
case Some(cat: Cat) =>
val emit = (cat :: frame).reverse
dogCount = 0
frame = Nil
emit
case Some(dog: Dog) if dogCount < 3 =>
frame = (dog :: frame)
dogCount += 1
Nil
case Some(dog: Dog) =>
val emit = (dog :: frame).reverse
dogCount = 0
frame = Nil
emit
case None =>
val emit = frame.reverse
dogCount = 0
frame = Nil
emit
}
}
}