this问题中的解决方案是否存在一个不变的替代方案,该解决方案将数据按批处理批量添加:]
val records = Source(List( Record(1, "a"), Record(1, "k"), Record(1, "k"), Record(1, "a"), Record(2, "r"), Record(2, "o"), Record(2, "c"), Record(2, "k"), Record(2, "s"), Record(3, "!") )) .concat(Source.single(Record(0, "notused"))) // needed to print the last element records .statefulMapConcat { () => var currentTime = 0 var payloads: Seq[String] = Nil record => if (record.time == currentTime) { payloads = payloads :+ record.payload Nil } else { val previousState = (currentTime, payloads) currentTime = record.time payloads = Seq(record.payload) List(previousState) } } .runForeach(println)
产品
(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))
此问题的解决方案是否有一个不变的替代方案,它可以在流中批量处理数据:val records = Source(List(Record(Record(1,“ a”),Record(1,“ k”),Record(1) ,“ k”),...