如何在 Spring Cloud Stream 中为生产者专用事务(db + kafka)启用数据库 + kafka 事务?

问题描述 投票:0回答:1

我们有一个事件驱动的分布式架构,具有使用 Spring Cloud Stream 的独立生产者和消费者微服务,应用程序需要执行以下操作:在生产者中,数据库插入/更新,然后向 Kafka 发布消息。但是,事务仅适用于数据库,不适用于 kafka。数据库事务会因错误而回滚,但消费微服务仍会发送和读取 kafka 消息。

使用的版本:spring-kafka 2.8.11、spring-boot 2.7.7、spring-cloud 版本 2021.0.5

为了启用事务,在 Spring Boot 应用程序类上使用 @EnableTransactionManagement 注解。对于仅限生产者的事务,我尝试使用 @Transactional 和文档中找到的其他一些替代方案,但它们都不起作用。测试事务时,我在代码中发送kafka消息后手动抛出RuntimeException。

示例代码(仅需要生产者交易):-

@Autowired
private final StreamBridge streamBridge;

@Transactional
public void sendDbAndKafkaUpdate() {
    // db write here...

}

private void sendKafkaMessage() {
    streamBridge.send("topic-name", messageEvent);

    //throw a RuntimeException here.
}

用于启用生产者事务的应用程序yaml配置:


spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: ${kafka.unique.tx.id.per.instance}  //this is set per service instance
            producer:
              configuration:
                retries: 1
                acks: all
    
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
                schema.registry.url: ${kafka.schema.registry.url}

我已经搜索了文档,但不太清楚处理此问题的推荐方法是什么?参考文档(请参阅仅生产者交易部分):- https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream- inder-kafka.html#kafka-transactional-binder

文档建议使用以下代码来启用仅生产者交易:-

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${kafka.unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

我已经尝试过,但是如果我在向 kafka 发布消息后手动抛出 RuntimeException,则这不起作用。数据库事务回滚,但 kafka 消息仍然发送(并由消费应用程序消费)

问题

  1. 如果使用 StreamBridge 向主题发送消息,绑定器名称应该是什么。它是否指的是 apache-kafka-binder 本身,这意味着如果仅使用该活页夹,则 null 就可以了?或者这与应用程序 yaml 中配置的绑定有关(注意:在使用streamBridge的情况下不使用输出绑定)?

  2. 更重要的是,如何同步仅生产者事务,其中数据库更新后发布kafka消息,考虑以下几点:-

  • 上面提到的文档建议使用 ChainedTransactionManager 来同步事务(“如果您希望将仅生产者事务与其他事务管理器的事务同步,请使用 ChainedTransactionManager。”)但是,请注意 ChainedTransactionManager 已被弃用。
  • 另请注意,KafkaTemplate 并未直接在应用程序中使用(考虑到 SCS 提供了抽象)
transactions spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka spring-cloud-function
1个回答
0
投票

对于如下所示的设置,您不需要定义自定义 Kafka 事务管理器。

@Autowired
private final StreamBridge streamBridge;

@Transactional
public void sendDbAndKafkaUpdate() {
    // db write here...

}

private void sendKafkaMessage() {
    streamBridge.send("topic-name", messageEvent);

    //throw a RuntimeException here.
}

它应该是端到端的交易。

@Transactional
注释将使用数据库事务管理器作为主要管理器(例如
JpaTransactionManager
)。我假设 db txn 管理器是由 Spring Boot 在您的情况下自动配置的。当事务拦截器拦截到调用时,会启动一个新的db事务,并在该事务下执行该方法。由于您提供了
transaction-id-prefix
,因此当调用
StreamBridge#send
方法时,操作将以事务方式完成。然而,
KafkaTemplate
使用的内部
StreamBridge
将 Kafka 事务与现有 JPA 事务同步。退出该方法时,主事务首先提交,然后是同步事务。如果 Kafka 发送后抛出异常,两个事务都会回滚。

您确定Kafka事务没有回滚吗?你是如何验证这一点的?在您的下游消费者中,您是否使用了

isolation.level
read_committed
? (
spring.cloud.stream.kafka.binder.configuration.isolation.level
)

另一件事要记住,如果您在应用程序中有自动配置的

TransactionManager
,那么您不需要在应用程序上添加
@EnableTransactionManagement
,因为 Spring Boot 已经应用了它。

您不需要在您的场景中使用任何链式事务管理器。仅当您想更改事务提交的顺序时才需要这样做。例如如果您希望 Kafka 事务而不是 DB 事务先提交,您可以使用链式 TM 或嵌套

@Transactional
方法调用。但是,通过查看您的解释,您的应用程序并不保证这些高级设置。

如果仍然无法正常工作,请随意创建一个小型示例应用程序,我们可以在其中重现问题。

© www.soinside.com 2019 - 2024. All rights reserved.