如何使用Spring kafka事务和Spring data jpa事务?我的应用程序中的 jpa 事务管理器消失并抛出此错误:
A component required a bean named 'transactionManager' that could not be found.
即使我创建了 JPA 事务管理器 bean,事务方法似乎只使用两个事务管理器 JpaTransactionManager
或 KafkaTransactionManager
之一,因此它只回滚kafka 或 jpa 两者都不是。之后我尝试像这个例子一样实现:https://docs.spring.io/spring-kafka/reference/tips.html#ex-jdbc-sync但它仍然不起作用并抛出java.lang.IllegalStateException: Already value [org.springframework.jdbc.datasource.ConnectionHolder@6baee63b] for key [HikariDataSource (HikariPool-1)] bound to thread
。有人可以帮我解决这个问题吗?
这是我实现的代码:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<Integer, String> producerFactory() {
DefaultKafkaProducerFactory<Integer, String> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
factory.setTransactionIdPrefix("tx-");
return factory;
}
@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
@Bean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
我的制作方法:
@AllArgsConstructor
@Component
public class ProducerTest {
private KafkaTemplate<Integer, String> kafkaTemplate;
private CountryRepository countryRepository;
@Transactional
public void run() {
for (int i = 0; i < 9; i++) {
kafkaTemplate.send("test", Integer.toString(i)); // Not rolled back
countryRepository.save(Country.builder().name("test").id(i).build()); // Rolled back successfully.If I comment this line, Spring will use kafkaTransactionManager bean.
if (i > 3) {
throw new RuntimeException("TEST ROLLBACK");
}
}
}
}
我的spring-boot版本是3.2.1。
经过长时间尝试寻找问题的解决方案,我想出了这个解决方法。我将我的方法分成两个单独的事务,并一个接一个地调用它们。
@Service
public class TransactionCallService {
@Transactional(value = "kafkaTransactionManager", propagation = Propagation.NESTED)
public <T> T runInNewTransaction(Supplier<T> supplier) {
return supplier.get();
}
}
@AllArgsConstructor
@Component
public class ProducerTest {
private KafkaTemplate<Integer, String> kafkaTemplate;
private CountryRepository countryRepository;
private TransactionCallService transactionCallService;
@Transactional("transactionManager")
public void run() {
for (int i = 0; i < 10; i++) {
countryRepository.save(Country.builder()
.name(UUID.randomUUID().toString().substring(0, 5)).id(i).build());
}
transactionCallService.runInNewTransaction(() -> {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send("test", Integer.toString(i));
}
int c = 1 / 0; // produce a error to test rollback
return null;
});
}
}
但是,这是唯一对我有用的方法。如果有人可以提供更好或不同的方法来解决这个问题,我将不胜感激。