我正在开发 Kafka Streams 应用程序,我想对按键分组的主题中的数据进行批处理,并进一步发送分组的数据。我不完全确定 Kafka Streams 是否是执行该操作的良好解决方案。
我编写了以下代码来说明这种情况:
KTable<Windowed<Long>, MessageDataAggregated> reduce = input
.mapValues((ValueMapper<MessageData, MessageDataAggregated>) MessageDataAggregated::new)
.groupByKey(Grouped.with(longSerde, messageDataAggregatedSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1L)))
.reduce(MessageDataAggregated::reduce)
.suppress(Suppressed.untilTimeLimit(Duration.ofMinutes(1L), Suppressed.BufferConfig.unbounded()));
reduce.toStream().to("message-stream")
有什么我应该考虑的潜在问题吗?或者还有其他更好的办法解决这个问题吗?
聚合的潜在问题是 - MessageDataAggreated 是否包含一个要附加到的列表。默认情况下,kafka 消息大小为 1MB,因此如果您要附加到列表,则可能会面临超过该阈值的风险。
使用reduce函数时请记住,它不会传播你的墓碑,所以如果你需要下游删除,你必须处理它。一个简单的解决方案是在聚合中设置一个标志,并在聚合后将其映射下来。