kafka Streams会话窗口

问题描述 投票:1回答:1

您好我正在使用kafka会话窗口,非活动时间为5分钟。当达到非活动时间并且会话根据密钥下降时,我想要某种反馈。我假设我有

(A,1)

记录“A”是关键字。现在,如果我在5分钟内没有获得任何'A'密钥记录,则会话被删除。

我想在会话结束时做一些操作,让那个会话说(值)* 2。有什么方法可以使用Kafka Stream API实现这一目标

apache-kafka avro apache-kafka-streams
1个回答
1
投票

在缺口时间过后,Kafka Streams不会放弃会话。相反,如果在间隔时间过后,如果具有相同密钥的另一个记录到达并且同时维护两个会话,则将创建新会话。这允许处理无序数据。甚至可能发生,如果无序数据落入间隙并且两个会话彼此“连接”,则会合并两个会话。

默认情况下,会话保持1天。您可以通过SessionWindows#until()方法更改此设置。如果会话过期,它将被静默删除。没有通知。您还需要考虑配置参数window.store.change.log.additional.retention.ms

默认保留设置为Windows#maintainMs()+ 1天。您可以通过在StreamsConfig中指定StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG来覆盖此设置。

因此,如果时间过去,你想要做出反应,你应该考虑允许你根据“偶数时间进度”或挂钟时间注册常规回调(某种计时器)的标点符号。如果会话在一段时间内没有更新并且您认为它已“完成”,则允许您做出反应。

© www.soinside.com 2019 - 2024. All rights reserved.