Spring Kafka 与数据库/MQ 交互的事务管理

问题描述 投票:0回答:1

我们正在尝试在 Spring Kafka 消费者中实现事务管理。

我们让 Kafka 消费者监听来自主题 A 的消息 -> 数据库更新/插入 -> 生成关于主题 B 的 Kafka 消息。

我面临的问题是,当数据库事务提交失败时,主题B的发送操作不会回滚。所以系统处于不一致状态。

其他场景按预期工作。

例如:

  1. 从kafka读取消息 -> db交互 -> kafka发送:如果kafka发送失败,则db txn未提交,读取消息的偏移量未提交。

  2. 从kafka读取消息 -> db交互 -> kafka send :如果消费消息的offset提交失败,kafka send和db不会提交。

PS:我知道kafka不支持XA事务。我确实看到一些资源提到了 ChaintedTransactionManager 的使用,根据文档,它已从 Spring Data Core 2.5 版本中弃用,因此最好不想使用它。任何建议表示赞赏。

我将代码片段放在下面 -

主要:

@KafkaHandler
@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void receiveCreationMessage(String event){
 
  // saves into database
  dao.saveDraft(event);
  
  // sends kafka message
  sendMQAdapterKafkaEvent(initiateReqMq, convertedEvent);
 
}
 
 @Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
 public void saveDraft(@NonNull String event) {
 
    entityManager.joinTransaction();
    entityManager.persist(event);
}
 
@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void sendMQAdapterKafkaEvent(String mqName, Object message) throws 
 
            kafkaTemplate.send(topicName, msgKey, message)
}

应用程序.属性

spring.kafka.producer.transaction-id-prefix=txnId-
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.properties.transactional-id=trans-id-

Kafka监听器容器工厂:

@Bean(KafkaConstants.ContainerFactoryNames.MANUAL_COMMIT_CONTAINER_FACTORY)
    @Autowired()
    public ConcurrentKafkaListenerContainerFactory<Object, Object> manualCommitKafkaListenerContainerFactory(
            @NonNull final ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            @NonNull final KafkaProperties kafkaProperties,
            @NonNull final RecordMessageConverter converter,
            @NonNull final ErrorHandler errorHandler,
            @NotNull final RetryTemplate retryTemplate,
            @NotNull @Qualifier("kafkaTxM") @Lazy final KafkaTransactionManager kafkaTransactionManager) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

        val consumerProperties = kafkaProperties.buildConsumerProperties();
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        val consumerFactory = new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);


        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

        factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 0L)));
        factory.setAfterRollbackProcessor(new TestRollbackProcesor(errorHandler, kafkaTransactionManager, new FixedBackOff(0L, 0L)));
        configurer.configure(factory, consumerFactory);

        return factory;
    }

Kafka 生产者工厂 –

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(final KafkaProperties kafkaProperties) throws IOException {

    val factory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    val transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
    if (transactionIdPrefix != null) {
        factory.setTransactionIdPrefix(transactionIdPrefix);
    }
    factory.transactionCapable();
    return factory;
}

Kafka tx 管理器 bean –

@Bean(name = "kafkaTxM")
public KafkaTransactionManager kafkaTransactionManager(final MyConfiguration myConfig,
                                                       final KafkaProperties kafkaProperties) throws IOException {
    KafkaTransactionManager ktm = new KafkaTransactionManager(kafkaProducerFactory(myConfig, kafkaProperties));;
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}
spring spring-boot transactions spring-kafka spring-transactions
1个回答
0
投票

13:20:20.798 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - 事务回滚

Kafka
交易确实被回滚了。

也许您的消费者正在使用默认值

isolation.level
(
read_uncommitted
)。

使用 Kafka,生产者记录始终写入日志,后跟一个标记块,指示事务是已提交还是已回滚。

消费者必须拥有

isolation.level
read_committed
才能跳过回滚的记录。

© www.soinside.com 2019 - 2024. All rights reserved.