Apache Storm与Kafka抵消管理

问题描述 投票:0回答:2

我使用Kafka作为源构建了一个使用Storm的示例拓扑。这是一个我需要解决方案的问题。

每次我杀死拓扑并再次启动它时,拓扑从头开始处理。

假设主题X中的消息A由拓扑处理,然后我终止拓扑。

现在当我再次提交拓扑并且消息A仍然存在主题X时。它再次被处理。

是否有解决方案,可能是某种抵消管理来处理这种情况。

apache-kafka apache-storm
2个回答
1
投票

您不应该将storm-kafka用于新代码,因为底层客户端API在Kafka中已弃用,并且从2.0.0开始删除,因此不推荐使用它。相反,使用storm-kafka-client

使用storm-kafka-client,您需要设置组ID和第一个轮询偏移策略。

KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
            .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
            .build();

以上将使你的喷口在你第一次启动时从最早的偏移开始,然后如果你重新启动它就会从它停止的地方开始。 Kafka使用组ID在重新启动时识别喷口,因此它可以返回存储的偏移检查点。其他偏移策略的行为会有所不同,您可以检查FirstPollOffsetStrategy枚举的javadoc。

喷口会检查它定期得到多远,配置中还有一个设置来控制它。检查点由配置中的setProcessingGuarantee设置控制,并且可以设置为至少一次(仅检查点确认偏移),最多一次(在喷口发出消息之前的检查点)和“任何时间”(定期检查点,忽略acks)。

看一下Storm https://github.com/apache/storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L93附带的示例拓扑之一。


1
投票

确保在创建spoutconfig时它具有固定的spout id,通过它可以在重新启动后识别自身。

来自官方风暴网站:

重要:重新部署拓扑时,请确保未修改SpoutConfig.zkRoot和SpoutConfig.id的设置,否则spout将无法从ZooKeeper读取其先前的消费者状态信息(即偏移量) - 这可能是导致意外行为和/或数据丢失,具体取决于您的使用案例。

© www.soinside.com 2019 - 2024. All rights reserved.