kafka 流是从主题分组消息并进一步发送的良好解决方案吗

问题描述 投票:0回答:1

我正在开发 Kafka Streams 应用程序,我想对按键分组的主题中的数据进行批处理,并进一步发送分组的数据。我不完全确定 Kafka Streams 是否是执行该操作的良好解决方案。

我编写了以下代码来说明这种情况:

KTable<Windowed<Long>, MessageDataAggregated> reduce = input
    .mapValues((ValueMapper<MessageData, MessageDataAggregated>) MessageDataAggregated::new)
    .groupByKey(Grouped.with(longSerde, messageDataAggregatedSerde))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1L)))
    .reduce(MessageDataAggregated::reduce)
    .suppress(Suppressed.untilTimeLimit(Duration.ofMinutes(1L), Suppressed.BufferConfig.unbounded()));

reduce.toStream().to("message-stream")

有什么我应该考虑的潜在问题吗?或者还有其他更好的办法解决这个问题吗?

apache-kafka-streams
1个回答
0
投票

聚合的潜在问题是 - MessageDataAggreated 是否包含一个要附加到的列表。默认情况下,kafka 消息大小为 1MB,因此如果您要附加到列表,则可能会面临超过该阈值的风险。

使用reduce函数时请记住,它不会传播你的墓碑,所以如果你需要下游删除,你必须处理它。一个简单的解决方案是在聚合中设置一个标志,并在聚合后将其映射下来。

© www.soinside.com 2019 - 2024. All rights reserved.