Kafka Streams Aggregator中的访问记录偏移量

问题描述 投票:0回答:1

我有一个简单的窗口拓扑结构。

builder.stream("input-topic", Consumed.with(...))
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .transformValues(FrameTransformer::new)
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

我想把窗口开始的实际记录偏移量 放到Frame聚合对象中去

我怎样才能从该对象中获取记录偏移量呢?windowAggregator (aggregate() handler)函数?

我知道,我可以通过在 FrameTransformer但这并不能帮助我创建准确的。Frame 对象,用开始和结束的偏移量来描述我的窗口。

我听说有一种方法可以做到这一点,那就是在我的窗口中插入另一个 .transform() 召见 groupByKey()在那里我可以访问偏移量,但我需要修改我的事件记录的模式,在那里存储偏移量信息。

有没有一种(更简单的)方法来实现我的意图?

更新

事实上,我能够得到准确的窗口开始和结束的偏移量在 Frame 对象的方式如下

builder.stream("input-topic", Consumed.with(...))
    .transformValues(EventTransformer::new)
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

但如上所述,以编辑模式为代价的 Event 对象。

apache-kafka-streams
1个回答
2
投票

如何从windowAggregator(aggregated()处理程序)函数中获得对记录偏移的访问?

你不能。你的方法是使用 transformValues() 前的聚集(并丰富了。Event 对象是正确的方法。

有与会者建议扩大API的范围,以便能够在 "记录元数据 "的范围内进行访问。aggregate() 和其他DSL运营商,但它从未被推到终点(参见 https:/cwiki.apache.orgconfluencedisplayKAFKAKIP-159%3A+Introducing+Rich+functions+to+Streams。).

© www.soinside.com 2019 - 2024. All rights reserved.