Kafka Consumer InstanceAlreadyExistsException

问题描述 投票:0回答:0

我正在尝试创建一个订阅主题并定期轮询的kafka消费者。在@Bean 下面创建 kafkaconsumer,它由执行程序服务在 @postconstruct 中使用。在其他用例中,我需要读取最新的偏移量或从特定偏移量重播,而我尝试创建一个新的 kafka cosumer,它正在抛出

javax.management.InstanceAlreadyExistsException: kafka.consumer

是否可以使用 kafkaConsumer 获取最新的偏移量或使用相同的 kafka 消费者重播一些偏移量。

@Configuration
@Slf4j
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConsumerClientConfig {
    @Bean
    public KafkaConsumer<String, String> kafkaClientConfig() {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(MetadataServiceUtil.getConsumerProperties(kafkaProperties));
    log.info("Kafka Consumer created successfully with SecurityProtocol={}", kafkaProperties.getSecurityProtocol());
    kafkaConsumer.subscribe(List.of(kafkaProperties.getTopicName()));
    return kafkaConsumer;
    }
}

Kafka客户端代码创建kafka消费者

@Component
@Slf4j
@Data
public class PipelineKafkaClient {

    @Autowired
    private KafkaConsumer kafkaConsumer;
  
    @Autowired
    private KafkaProperties kafkaProperties;

    @PostConstruct
    void startup() {
        log.info("*******Starting event listener********");
        executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new EventConsumer(kafkaConsumer));
        // In the EventConsumer i have the code as below
        /*  while(true) {
         *   ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
         *   processBatch(records);
         *   kafkaConsumer.commitSync();
         * }
         */

    }

    public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
        log.info("Replaying messages");
        Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // CODE to read from specific offset    
    }

}@Component
@Slf4j
@Data
public class PipelineKafkaClient {

    @Autowired
    private KafkaConsumer kafkaConsumer;
  
    @Autowired
    private KafkaProperties kafkaProperties;

    @PostConstruct
    void startup() {
        log.info("*******Starting event listener********");
        executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new EventConsumer(kafkaConsumer));
        // In the EventConsumer i have the code as below
        /*  while(true) {
         *   ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
         *   processBatch(records);
         *   kafkaConsumer.commitSync();
         * }
         */

    }

    public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
        log.info("Replaying messages");
        Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // CODE to read from specific offset    
    }

}
java apache-kafka kafka-consumer-api
© www.soinside.com 2019 - 2024. All rights reserved.