我使用定期水印按事件时间对消息进行排序,但是如果我配置“withIdleness”,结果会出现很大差异。
基本上,如果我使用“withIdleness”,我会收到大约 90% 的消息。如果没有的话,我只会得到约 5% 的缺货。
环境:
Flink 1.15.2
并行度3
自动水印间隔200ms
来源:
将 3 个 kafkaSourceStream 合并为一个流。
各个kafka源码中的水印策略:
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
.withIdleness(Duration.ofSeconds(5));
Q1:如果我们知道所有主题/分区都有消息,是否应该配置空闲?我有这个问题,因为在重新处理所有这些消息后,我将以“常规方式”使用。
Q2:假设我还有一个没有消息的主题(第四个主题)。这种情况我应该如何处理?我应该只为这个主题配置空闲吗?
感谢您的帮助
withIdleness 的目的是强制等待水印的操作员生成结果,而无需等待超过提供的时间间隔(在本例中为 5 秒)。听起来,在您的情况下,事件相隔 5 秒(或更长)的情况相对常见,这引起了一些头痛。
Q1:对于您希望每个分区都有连续事件流的主题,不要使用 withIdleness。
Q2:是的,在这种情况下您应该配置空闲超时。并将其设置得足够大,这样就不会在“一切照旧”期间被触发。