Spring KafkaListener 断开连接并停止消费 - CommitFailedException

问题描述 投票:0回答:1

我有一个 Kafka 主题,它从不(故意)清除消息。我想不断地消费来自它的消息,即使我的消费者离线几天/几周/几个月。据我了解,这可能是不可能的,所以我从我的解决方案开始。

在运行它时,我收到以下异常,之后我的 @KafkaListener 似乎再也不会消费消息,即使新消息一直被放入“topic1”中。我尝试重新启动我的 Spring Boot 应用程序。我什至尝试使用以下方法将组偏移重置回 0:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group aggregator-txnstokafka-group-ethereum --topic topic1 --reset-offsets --to-earliest --execute

但消费永远不会恢复。

如何让我的 @KafkaListener 恢复消费消息?

例外:

2023-10-05 11:14:31,755 ERROR org.springframework.kafka.core.DefaultKafkaProducerFactory [aggregator-txnstokafka-listener-ethereum-0-C-1] - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7de7fc0]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
        at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        ... 1 more
2023-10-05 11:14:31,760 ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer [aggregator-txnstokafka-listener-ethereum-0-C-1] - Transaction rolled back
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
        at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        ... 1 more

在服务器上,我配置了:

offsets.retention.minutes=5256000
group.max.session.timeout.ms=2147483647

在我的客户端中,我配置了

session.timeout.ms=2147483646
max.poll.interval.ms=900000

这是从 topic1 提取并发布到 topic2 的代码片段(为了简洁起见,此处删除了其余功能)。

@Autowired
@Qualifier("evmBlockToKakfaTemplate")
KafkaTemplate<Number, Object> destTemplate;

@KafkaListener(id = "aggregator-txnstokafka-listener-ethereum", groupId = "aggregator-txnstokafka-group-ethereum", topics = "topic1", containerFactory = "evmTxnsToKafkaContainerFactory")
public void onMessage(ConsumerRecord<Long, String> record, Acknowledgment acknowledgement) {
  // Do some stuff
  destTemplate.send("topic2", key, value);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> evmTxnsToKafkaContainerFactory(
        KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager,
        ConsumerFactory<? super String, ? super String> evmTxnToKakfaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setTransactionManager(evmTxnAggregatorTransactionManager);
    factory.setConsumerFactory(evmTxnToKakfaConsumerFactory);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setCommonErrorHandler(new TransactionAggregatorErrorHandler());
    factory.setConcurrency(1);
    factory.setBatchListener(false);

    return factory;
}

@Bean
public KafkaTemplate<Number, Object> evmTxnToKakfaTemplate() {
    return new KafkaTemplate<Number, Object>(evmTxnToKakfaProducerFactory());
}

@Bean
public DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBroker);
    config.put(ProducerConfig.CLIENT_ID_CONFIG, "aggregator-txnstokafka-" + chain);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "aggregator-txnstokafka-" + chain + "-");
    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public ConsumerFactory<? super String, ? super BlockchainDataAggregator> evmTxnToKakfaConsumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, srcKafkaBroker);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "2147483646"); // Integer max value. About 24.8 days
    //config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ...);
    config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000"); // 15 minutes
    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager(
        DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory) {
    KafkaTransactionManager<Number, Object> kafkaTransactionManager = new KafkaTransactionManager<>(
            evmTxnToKakfaProducerFactory);
    kafkaTransactionManager
            .setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
    return kafkaTransactionManager;
}

我们使用...

  • Ubuntu 上的 Kafka 2.12-3.2.0
  • Java 18
  • Spring Boot Starter 2.7.4 与 Spring Kafka
  • “topic1”是 1 个分区,有 3 个副本(故意)
apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

对这里的要求有点困惑。但是,您可以检查一些事情。

  1. 该主题的保留字节数是多少
  2. 当您将消费者重置为最早时,请尝试更改消费者组名称,看看问题是否仍然存在。如果您尝试访问无效的偏移量,则可能会发生此提交失败异常。
© www.soinside.com 2019 - 2024. All rights reserved.