我们有一个事件驱动的分布式架构,具有使用 Spring Cloud Stream 的独立生产者和消费者微服务,应用程序需要执行以下操作:在生产者中,数据库插入/更新,然后向 Kafka 发布消息。但是,事务仅适用于数据库,不适用于 kafka。数据库事务会因错误而回滚,但消费微服务仍会发送和读取 kafka 消息。
使用的版本:spring-kafka 2.8.11、spring-boot 2.7.7、spring-cloud 版本 2021.0.5
为了启用事务,在 Spring Boot 应用程序类上使用 @EnableTransactionManagement 注解。对于仅限生产者的事务,我尝试使用 @Transactional 和文档中找到的其他一些替代方案,但它们都不起作用。测试事务时,我在代码中发送kafka消息后手动抛出RuntimeException。
示例代码(仅需要生产者交易):-
@Autowired
private final StreamBridge streamBridge;
@Transactional
public void sendDbAndKafkaUpdate() {
// db write here...
}
private void sendKafkaMessage() {
streamBridge.send("topic-name", messageEvent);
//throw a RuntimeException here.
}
用于启用生产者事务的应用程序yaml配置:
spring:
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: ${kafka.unique.tx.id.per.instance} //this is set per service instance
producer:
configuration:
retries: 1
acks: all
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
schema.registry.url: ${kafka.schema.registry.url}
我已经搜索了文档,但不太清楚处理此问题的推荐方法是什么?参考文档(请参阅仅生产者交易部分):- https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream- inder-kafka.html#kafka-transactional-binder
文档建议使用以下代码来启用仅生产者交易:-
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${kafka.unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
我已经尝试过,但是如果我在向 kafka 发布消息后手动抛出 RuntimeException,则这不起作用。数据库事务回滚,但 kafka 消息仍然发送(并由消费应用程序消费)
问题
如果使用 StreamBridge 向主题发送消息,绑定器名称应该是什么。它是否指的是 apache-kafka-binder 本身,这意味着如果仅使用该活页夹,则 null 就可以了?或者这与应用程序 yaml 中配置的绑定有关(注意:在使用streamBridge的情况下不使用输出绑定)?
更重要的是,如何同步仅生产者事务,其中数据库更新后发布kafka消息,考虑以下几点:-
对于如下所示的设置,您不需要定义自定义 Kafka 事务管理器。
@Autowired
private final StreamBridge streamBridge;
@Transactional
public void sendDbAndKafkaUpdate() {
// db write here...
}
private void sendKafkaMessage() {
streamBridge.send("topic-name", messageEvent);
//throw a RuntimeException here.
}
它应该是端到端的交易。
@Transactional
注释将使用数据库事务管理器作为主要管理器(例如 JpaTransactionManager
)。我假设 db txn 管理器是由 Spring Boot 在您的情况下自动配置的。当事务拦截器拦截到调用时,会启动一个新的db事务,并在该事务下执行该方法。由于您提供了 transaction-id-prefix
,因此当调用 StreamBridge#send
方法时,操作将以事务方式完成。然而,KafkaTemplate
使用的内部 StreamBridge
将 Kafka 事务与现有 JPA 事务同步。退出该方法时,主事务首先提交,然后是同步事务。如果 Kafka 发送后抛出异常,两个事务都会回滚。
您确定Kafka事务没有回滚吗?你是如何验证这一点的?在您的下游消费者中,您是否使用了
isolation.level
或 read_committed
? (spring.cloud.stream.kafka.binder.configuration.isolation.level
)
另一件事要记住,如果您在应用程序中有自动配置的
TransactionManager
,那么您不需要在应用程序上添加 @EnableTransactionManagement
,因为 Spring Boot 已经应用了它。
您不需要在您的场景中使用任何链式事务管理器。仅当您想更改事务提交的顺序时才需要这样做。例如如果您希望 Kafka 事务而不是 DB 事务先提交,您可以使用链式 TM 或嵌套
@Transactional
方法调用。但是,通过查看您的解释,您的应用程序并不保证这些高级设置。
如果仍然无法正常工作,请随意创建一个小型示例应用程序,我们可以在其中重现问题。