我使用Kafka作为源构建了一个使用Storm的示例拓扑。这是一个我需要解决方案的问题。
每次我杀死拓扑并再次启动它时,拓扑从头开始处理。
假设主题X中的消息A由拓扑处理,然后我终止拓扑。
现在当我再次提交拓扑并且消息A仍然存在主题X时。它再次被处理。
是否有解决方案,可能是某种抵消管理来处理这种情况。
您不应该将storm-kafka
用于新代码,因为底层客户端API在Kafka中已弃用,并且从2.0.0开始删除,因此不推荐使用它。相反,使用storm-kafka-client
。
使用storm-kafka-client
,您需要设置组ID和第一个轮询偏移策略。
KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.build();
以上将使你的喷口在你第一次启动时从最早的偏移开始,然后如果你重新启动它就会从它停止的地方开始。 Kafka使用组ID在重新启动时识别喷口,因此它可以返回存储的偏移检查点。其他偏移策略的行为会有所不同,您可以检查FirstPollOffsetStrategy枚举的javadoc。
喷口会检查它定期得到多远,配置中还有一个设置来控制它。检查点由配置中的setProcessingGuarantee
设置控制,并且可以设置为至少一次(仅检查点确认偏移),最多一次(在喷口发出消息之前的检查点)和“任何时间”(定期检查点,忽略acks)。
确保在创建spoutconfig时它具有固定的spout id,通过它可以在重新启动后识别自身。
来自官方风暴网站:
重要:重新部署拓扑时,请确保未修改SpoutConfig.zkRoot和SpoutConfig.id的设置,否则spout将无法从ZooKeeper读取其先前的消费者状态信息(即偏移量) - 这可能是导致意外行为和/或数据丢失,具体取决于您的使用案例。