重置Spring Boot Kafka Stream应用程序以修改主题

问题描述 投票:0回答:1

我正在使用spring-kafka使用StreamsBuilderFactoryBean在Spring Boot应用程序中运行Kafka Stream。我通过删除并重新创建将某些主题中的分区数从100更改为20,但是现在在运行该应用程序时,出现以下错误:

Existing internal topic MyAppId-KSTREAM-AGGREGATE-STATE-STORE-0000000092-changelog has invalid partitions: expected: 20; actual: 100. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.

我无法访问类kafka.tools.StreamsResetter并尝试调用StreamsBuilderFactoryBean.getKafkaStreams.cleanup(),但它给出了NullPointerException。我该如何清理?

spring-boot apache-kafka apache-kafka-streams spring-kafka
1个回答
0
投票

相关文档位于here

步骤1:本地清理

对于使用StreamsBuilderFactoryBean的Spring Boot,第一步可以通过简单地将CleanerConfig添加到构造函数中来完成:

// Before
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config));
// After
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config), new CleanupConfig(true, true));

这可以在KafkaStreams.cleanUp()之前和start()之后都调用stop()方法。

步骤2:全局清除

对于第二步,在应用程序的所有实例停止的情况下,只需按照文档中的说明使用该工具:

# In kafka directory
bin/kafka-streams-application-reset.sh --application-id "MyAppId" --bootstrap-servers 1.2.3.4:9092 --input-topics x --intermediate-topics first_x,second_x,third_x --zookeeper 1.2.3.4:2181

这是什么:

对于任何指定的输入主题:将所有分区(对于消费者组application.id)将应用程序的提交的使用者偏移量重置为“主题的开头”。

对于任何指定的中间主题:跳至主题末尾,即,将所有分区的应用程序的已提交使用者偏移量设置为每个分区的logSize(对于使用者组application.id)。

对于任何内部主题:删除内部主题(这还将删除已提交的对应的已提交偏移量。]

© www.soinside.com 2019 - 2024. All rights reserved.