Akka流批量处理

问题描述 投票:0回答:1

学习Akka流。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时将它们分为时间组进行处理。

示例

case class Record(time: Int, payload: String)

如果传入的流是

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

我想将其转换为

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

到目前为止,我只发现按固定数量的记录分组,或者分成许多子流,但是从我的角度来看,我不需要多个子流。

更新:我发现了batch,但它看起来更多地是与背压有关,而不仅仅是一直进行批处理。

scala akka akka-stream
1个回答
3
投票
[batch是Akka Streams库中的多功能工具。

statefulMapConcat

运行以上命令会显示以下内容:

statefulMapConcat

您可以调整示例以打印val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)
对象。
© www.soinside.com 2019 - 2024. All rights reserved.