我有一个简单的窗口拓扑结构。
builder.stream("input-topic", Consumed.with(...))
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.transformValues(FrameTransformer::new)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
我想把窗口开始的实际记录偏移量 放到Frame聚合对象中去
我怎样才能从该对象中获取记录偏移量呢?windowAggregator
(aggregate()
handler)函数?
我知道,我可以通过在 FrameTransformer
但这并不能帮助我创建准确的。Frame
对象,用开始和结束的偏移量来描述我的窗口。
我听说有一种方法可以做到这一点,那就是在我的窗口中插入另一个 .transform()
召见 groupByKey()
在那里我可以访问偏移量,但我需要修改我的事件记录的模式,在那里存储偏移量信息。
有没有一种(更简单的)方法来实现我的意图?
更新
事实上,我能够得到准确的窗口开始和结束的偏移量在 Frame
对象的方式如下
builder.stream("input-topic", Consumed.with(...))
.transformValues(EventTransformer::new)
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
但如上所述,以编辑模式为代价的 Event
对象。
如何从windowAggregator(aggregated()处理程序)函数中获得对记录偏移的访问?
你不能。你的方法是使用 transformValues()
前的聚集(并丰富了。Event
对象是正确的方法。
有与会者建议扩大API的范围,以便能够在 "记录元数据 "的范围内进行访问。aggregate()
和其他DSL运营商,但它从未被推到终点(参见 https:/cwiki.apache.orgconfluencedisplayKAFKAKIP-159%3A+Introducing+Rich+functions+to+Streams。).