使用jpa和kafka的示例Spring事务?

问题描述 投票:1回答:1

在将Spring Boot从2.1.11版本升级到2.2.5之后,kafka客户端会在提交jpa事务之前向代理生成消息。如果不使用链接的kafka事务管理器,则事务运行正常。 2.1.x和2.2.x之间的事务中是否引入了向后兼容性更改?有人可以提供涵盖JPA和Kafka的任何工作交易经理吗?

我只为JPA使用了以下事务管理器

  @Bean
  @Primary
  public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
    final JpaTransactionManager txManager = new JpaTransactionManager();
    txManager.setEntityManagerFactory(emf);
    return txManager;
  }

我在kafka交易中使用了以下属性:

spring.cloud.stream.kafka.default.consumer.configuration.isolation.level:read_committedspring.cloud.stream.kafka.binder.transaction.transaction-id前缀:xyz-0spring.kafka.producer.transaction-id-prefix:xyz-0

spring-boot kafka-producer-api
1个回答
0
投票

我们可以使用jpa和kafka创建链接的交易。我们可以使用BinderFactory来获取kafka活页夹并创建活页夹kafka事务管理器。

@Bean(name = "chainedTransactionManager")
  @Primary
  public PlatformTransactionManager chainedTransactionManager(JpaTransactionManager jpaTM,
      BinderFactory binders) {
    Binder<MessageChannel,?,?> binder = binders.getBinder("kafka", MessageChannel.class);
    if (binder instanceof KafkaMessageChannelBinder) {
      ProducerFactory<byte[], byte[]> pf =
          ((KafkaMessageChannelBinder) binder).getTransactionalProducerFactory();
      KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
      ktm.setTransactionSynchronization(
          AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
      return new ChainedKafkaTransactionManager<Object, Object>(jpaTM, ktm);
    } else {
      return jpaTM;
    }
  }```
© www.soinside.com 2019 - 2024. All rights reserved.