我正在尝试创建一个订阅主题并定期轮询的kafka消费者。在@Bean 下面创建 kafkaconsumer,它由执行程序服务在 @postconstruct 中使用。在其他用例中,我需要读取最新的偏移量或从特定偏移量重播,而我尝试创建一个新的 kafka cosumer,它正在抛出
javax.management.InstanceAlreadyExistsException: kafka.consumer
是否可以使用 kafkaConsumer 获取最新的偏移量或使用相同的 kafka 消费者重播一些偏移量。
@Configuration
@Slf4j
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConsumerClientConfig {
@Bean
public KafkaConsumer<String, String> kafkaClientConfig() {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(MetadataServiceUtil.getConsumerProperties(kafkaProperties));
log.info("Kafka Consumer created successfully with SecurityProtocol={}", kafkaProperties.getSecurityProtocol());
kafkaConsumer.subscribe(List.of(kafkaProperties.getTopicName()));
return kafkaConsumer;
}
}
Kafka客户端代码创建kafka消费者
@Component
@Slf4j
@Data
public class PipelineKafkaClient {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private KafkaProperties kafkaProperties;
@PostConstruct
void startup() {
log.info("*******Starting event listener********");
executorService = Executors.newSingleThreadExecutor();
executorService.submit(new EventConsumer(kafkaConsumer));
// In the EventConsumer i have the code as below
/* while(true) {
* ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
* processBatch(records);
* kafkaConsumer.commitSync();
* }
*/
}
public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
log.info("Replaying messages");
Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// CODE to read from specific offset
}
}@Component
@Slf4j
@Data
public class PipelineKafkaClient {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private KafkaProperties kafkaProperties;
@PostConstruct
void startup() {
log.info("*******Starting event listener********");
executorService = Executors.newSingleThreadExecutor();
executorService.submit(new EventConsumer(kafkaConsumer));
// In the EventConsumer i have the code as below
/* while(true) {
* ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
* processBatch(records);
* kafkaConsumer.commitSync();
* }
*/
}
public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
log.info("Replaying messages");
Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// CODE to read from specific offset
}
}