多个实例上的Kafka Stream拓扑

问题描述 投票:0回答:1

我们有一个流拓扑,可以在多台机器上运行。我们将时间窗口聚合结果存储到状态存储中。由于状态存储正在存储本地数据,我认为应该在另一个主题上进行聚合以进行整体聚合。但似乎我遗漏了一些东西,因为没有一个例子在另一个KStream或Processor上进行整体聚合。

我们是否需要使用groupBy逻辑来存储整体聚合,或者使用GlobalKtable或者只是实现我们自己的合并代码?

这个的正确架构是什么?

在下面的代码中,我试图用一个常量键将所有进入处理器的消息分组,只在一台机器上存储整体聚合,但我认为它会失去Kafka提供的并行性。

dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
        .filter((key, event) -> event != null && event.getClientCreationDate() != null);

 dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
       .groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
       .windowedBy(timeWindow)
       .count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));
apache-kafka apache-kafka-streams stream-processing
1个回答
0
投票

在下面的代码中,我试图用一个常量键将所有进入处理器的消息分组,只在一台机器上存储整体聚合,但我认为它会失去Kafka提供的并行性。

这似乎是正确的方法。是的,你放松了并行性,但这就是全球聚合的工作方式。最后,一台机器必须计算它......

你可以改进的是,采用两步法:即,首先通过“随机”键并行聚合,然后使用仅有一个键的第二步将部分聚合“合并”为单个聚合。这样,计算的某些部分是并行化的,只有最后一步(希望减少数据负载)是非并行的。使用Kafka Streams,您需要“手动”实现此方法。

© www.soinside.com 2019 - 2024. All rights reserved.