学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时将它们分为时间组进行处理。
示例
case class Record(time: Int, payload: String)
如果传入的流是
Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...
我想将其转换为
Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...
到目前为止,我只发现按固定数量的记录分组,或者分成许多子流,但是从我的角度来看,我不需要多个子流。
更新:我发现了batch
,但它看起来更多地是与背压有关,而不仅仅是一直进行批处理。
batch
是Akka Streams库中的多功能工具。运行以上命令会显示以下内容:
statefulMapConcat
您可以调整示例以打印val records =
Source(List(
Record(1, "a"),
Record(1, "k"),
Record(1, "k"),
Record(1, "a"),
Record(2, "r"),
Record(2, "o"),
Record(2, "c"),
Record(2, "k"),
Record(2, "s"),
Record(3, "!")
))
.concat(Source.single(Record(0, "notused"))) // needed to print the last element
records
.statefulMapConcat { () =>
var currentTime = 0
var payloads: Seq[String] = Nil
record =>
if (record.time == currentTime) {
payloads = payloads :+ record.payload
Nil
} else {
val previousState = (currentTime, payloads)
currentTime = record.time
payloads = Seq(record.payload)
List(previousState)
}
}
.runForeach(println)
对象。