我有一个带窗口的流,需要根据窗口内发生的所有值加上窗口前发生的最新值来计算一个值。
inputStream.groupByKey
.windowedBy(timeWindow)
.aggregate(Aggregation()) {
case (_, value, aggregation) =>
// ...
}
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream
.map((windowed, aggregation) => {
// here I need access to the last value which
// occurred before the window
}
.to("output")
我怎样才能在流的这个阶段获得前一个值的访问权?
你需要使用处理器API并编写一个自定义的 Processor
(或者如果你把它嵌入到DSL中,你可以使用 transform()
)与附加的windowed store。需要注意的是,windowed-store也只是一个key-value存储,key由记录-key和窗口的开始时间戳组成。