如何使用Spring kafka事务和Spring data jpa?

问题描述 投票:0回答:1

如何使用Spring kafka事务和Spring data jpa事务?我的应用程序中的 jpa 事务管理器消失并抛出此错误:

A component required a bean named 'transactionManager' that could not be found.
即使我创建了 JPA 事务管理器 bean,事务方法似乎只使用两个事务管理器
JpaTransactionManager
KafkaTransactionManager
之一,因此它只回滚kafka 或 jpa 两者都不是。之后我尝试像这个例子一样实现:https://docs.spring.io/spring-kafka/reference/tips.html#ex-jdbc-sync但它仍然不起作用并抛出
java.lang.IllegalStateException: Already value [org.springframework.jdbc.datasource.ConnectionHolder@6baee63b] for key [HikariDataSource (HikariPool-1)] bound to thread
。有人可以帮我解决这个问题吗?

这是我实现的代码:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        DefaultKafkaProducerFactory<Integer, String> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        factory.setTransactionIdPrefix("tx-");
        return factory;
    }


    @Bean
    @Primary
    public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory); 
    }

    @Bean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }
}

我的制作方法:

@AllArgsConstructor
@Component
public class ProducerTest {
    private KafkaTemplate<Integer, String> kafkaTemplate;
    private CountryRepository countryRepository;

    @Transactional
    public void run() {
        for (int i = 0; i < 9; i++) {
            kafkaTemplate.send("test", Integer.toString(i)); // Not rolled back
            countryRepository.save(Country.builder().name("test").id(i).build()); // Rolled back successfully.If I comment this line, Spring will use kafkaTransactionManager bean.

            if (i > 3) {
                throw new RuntimeException("TEST ROLLBACK");
            }
        }
    }

}

我的spring-boot版本是3.2.1。

spring spring-boot apache-kafka spring-data-jpa spring-kafka
1个回答
0
投票

经过长时间尝试寻找问题的解决方案,我想出了这个解决方法。我将我的方法分成两个单独的事务,并一个接一个地调用它们。

@Service
public class TransactionCallService {
    @Transactional(value = "kafkaTransactionManager", propagation = Propagation.NESTED)
    public <T> T runInNewTransaction(Supplier<T> supplier) {
        return supplier.get();
    }
}
@AllArgsConstructor
@Component
public class ProducerTest {
    private KafkaTemplate<Integer, String> kafkaTemplate;
    private CountryRepository countryRepository;
    private TransactionCallService transactionCallService;

    @Transactional("transactionManager")
    public void run() {
        for (int i = 0; i < 10; i++) {
            countryRepository.save(Country.builder()
                    .name(UUID.randomUUID().toString().substring(0, 5)).id(i).build());
        }
        transactionCallService.runInNewTransaction(() -> {
            for (int i = 0; i < 10; i++) {
                kafkaTemplate.send("test", Integer.toString(i));
            }
            int c = 1 / 0; // produce a error to test rollback
            return null;
        });
    }
}

但是,这是唯一对我有用的方法。如果有人可以提供更好或不同的方法来解决这个问题,我将不胜感激。

© www.soinside.com 2019 - 2024. All rights reserved.