我有一个 Kafka 主题,它从不(故意)清除消息。我想不断地消费来自它的消息,即使我的消费者离线几天/几周/几个月。据我了解,这可能是不可能的,所以我从我的解决方案开始。
在运行它时,我收到以下异常,之后我的 @KafkaListener 似乎再也不会消费消息,即使新消息一直被放入“topic1”中。我尝试重新启动我的 Spring Boot 应用程序。我什至尝试使用以下方法将组偏移重置回 0:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group aggregator-txnstokafka-group-ethereum --topic topic1 --reset-offsets --to-earliest --execute
但消费永远不会恢复。
如何让我的 @KafkaListener 恢复消费消息?
例外:
2023-10-05 11:14:31,755 ERROR org.springframework.kafka.core.DefaultKafkaProducerFactory [aggregator-txnstokafka-listener-ethereum-0-C-1] - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7de7fc0]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
... 1 more
2023-10-05 11:14:31,760 ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer [aggregator-txnstokafka-listener-ethereum-0-C-1] - Transaction rolled back
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
... 1 more
在服务器上,我配置了:
offsets.retention.minutes=5256000
group.max.session.timeout.ms=2147483647
在我的客户端中,我配置了
session.timeout.ms=2147483646
max.poll.interval.ms=900000
这是从 topic1 提取并发布到 topic2 的代码片段(为了简洁起见,此处删除了其余功能)。
@Autowired
@Qualifier("evmBlockToKakfaTemplate")
KafkaTemplate<Number, Object> destTemplate;
@KafkaListener(id = "aggregator-txnstokafka-listener-ethereum", groupId = "aggregator-txnstokafka-group-ethereum", topics = "topic1", containerFactory = "evmTxnsToKafkaContainerFactory")
public void onMessage(ConsumerRecord<Long, String> record, Acknowledgment acknowledgement) {
// Do some stuff
destTemplate.send("topic2", key, value);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> evmTxnsToKafkaContainerFactory(
KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager,
ConsumerFactory<? super String, ? super String> evmTxnToKakfaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setTransactionManager(evmTxnAggregatorTransactionManager);
factory.setConsumerFactory(evmTxnToKakfaConsumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(new TransactionAggregatorErrorHandler());
factory.setConcurrency(1);
factory.setBatchListener(false);
return factory;
}
@Bean
public KafkaTemplate<Number, Object> evmTxnToKakfaTemplate() {
return new KafkaTemplate<Number, Object>(evmTxnToKakfaProducerFactory());
}
@Bean
public DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBroker);
config.put(ProducerConfig.CLIENT_ID_CONFIG, "aggregator-txnstokafka-" + chain);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "aggregator-txnstokafka-" + chain + "-");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<? super String, ? super BlockchainDataAggregator> evmTxnToKakfaConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, srcKafkaBroker);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "2147483646"); // Integer max value. About 24.8 days
//config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ...);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000"); // 15 minutes
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager(
DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory) {
KafkaTransactionManager<Number, Object> kafkaTransactionManager = new KafkaTransactionManager<>(
evmTxnToKakfaProducerFactory);
kafkaTransactionManager
.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return kafkaTransactionManager;
}
我们使用...
对这里的要求有点困惑。但是,您可以检查一些事情。