不可改变的流批量处理

问题描述 投票:2回答:1

this问题中的解决方案是否存在一个不变的替代方案,该解决方案将数据按批处理批量添加:]

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

产品

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

此问题的解决方案是否有一个不变的替代方案,它可以在流中批量处理数据:val records = Source(List(Record(Record(1,“ a”),Record(1,“ k”),Record(1) ,“ k”),...

scala akka akka-stream
1个回答
0
投票

以良好的方式,不变性和无状态是编程并发分布式软件的重要方面。在使用scastie中的groupBy时,我将与您的示例代码共享此akka。让我知道,是否可以帮助您。输出就像

© www.soinside.com 2019 - 2024. All rights reserved.