我们正在尝试在 Spring Kafka 消费者中实现事务管理。
我们让 Kafka 消费者监听来自主题 A 的消息 -> 数据库更新/插入 -> 生成关于主题 B 的 Kafka 消息。
我面临的问题是,当数据库事务提交失败时,主题B的发送操作不会回滚。所以系统处于不一致状态。
其他场景按预期工作。
例如:
从kafka读取消息 -> db交互 -> kafka发送:如果kafka发送失败,则db txn未提交,读取消息的偏移量未提交。
从kafka读取消息 -> db交互 -> kafka send :如果消费消息的offset提交失败,kafka send和db不会提交。
PS:我知道kafka不支持XA事务。我确实看到一些资源提到了 ChaintedTransactionManager 的使用,根据文档,它已从 Spring Data Core 2.5 版本中弃用,因此最好不想使用它。任何建议表示赞赏。
我将代码片段放在下面 -
主要:
@KafkaHandler
@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void receiveCreationMessage(String event){
// saves into database
dao.saveDraft(event);
// sends kafka message
sendMQAdapterKafkaEvent(initiateReqMq, convertedEvent);
}
@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void saveDraft(@NonNull String event) {
entityManager.joinTransaction();
entityManager.persist(event);
}
@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void sendMQAdapterKafkaEvent(String mqName, Object message) throws
kafkaTemplate.send(topicName, msgKey, message)
}
应用程序.属性
spring.kafka.producer.transaction-id-prefix=txnId-
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.properties.transactional-id=trans-id-
Kafka监听器容器工厂:
@Bean(KafkaConstants.ContainerFactoryNames.MANUAL_COMMIT_CONTAINER_FACTORY)
@Autowired()
public ConcurrentKafkaListenerContainerFactory<Object, Object> manualCommitKafkaListenerContainerFactory(
@NonNull final ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
@NonNull final KafkaProperties kafkaProperties,
@NonNull final RecordMessageConverter converter,
@NonNull final ErrorHandler errorHandler,
@NotNull final RetryTemplate retryTemplate,
@NotNull @Qualifier("kafkaTxM") @Lazy final KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
val consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
val consumerFactory = new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 0L)));
factory.setAfterRollbackProcessor(new TestRollbackProcesor(errorHandler, kafkaTransactionManager, new FixedBackOff(0L, 0L)));
configurer.configure(factory, consumerFactory);
return factory;
}
Kafka 生产者工厂 –
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(final KafkaProperties kafkaProperties) throws IOException {
val factory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
val transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
factory.transactionCapable();
return factory;
}
Kafka tx 管理器 bean –
@Bean(name = "kafkaTxM")
public KafkaTransactionManager kafkaTransactionManager(final MyConfiguration myConfig,
final KafkaProperties kafkaProperties) throws IOException {
KafkaTransactionManager ktm = new KafkaTransactionManager(kafkaProducerFactory(myConfig, kafkaProperties));;
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
13:20:20.798 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - 事务回滚
Kafka
交易确实被回滚了。
也许您的消费者正在使用默认值
isolation.level
(read_uncommitted
)。
使用 Kafka,生产者记录始终写入日志,后跟一个标记块,指示事务是已提交还是已回滚。
消费者必须拥有
isolation.level
read_committed
才能跳过回滚的记录。