Spring云流kafka流binder——函数式编程模型中的@Transactional注解

问题描述 投票:0回答:1

如果事件处理过程中发生异常,我无法执行数据库事务回滚。我正在使用 Spring Cloud Stream 和 Kafka Stream Binder - 以及函数式编程模型。

我从主题中检索事件,然后如果记录未在那里注册,则将某些内容放入数据库中,然后如果满足某些条件,则生成新的 kafka 消息。如果出现问题,我想回滚数据库事务 - 并防止生成记录。

pom

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

我在PurchaseEventsProcessor中有方法

@Transactional
public Function<KStream<Long, Event>, KStream<Long, Event>> processPurchase() {

    return inputStream -> inputStream
            .filter(this::isValidPurchaseEvent)
            .filter((unused, event) -> Arrays.asList(allowedCountries).contains(event.country().name()))
            .mapValues(event -> customerHistoryService.registerPurchase(
                    event.timestamp(),
                    event.country(),
                    event.customerId(),
                    event.uuid(),
                    new BigDecimal(event.payload().get(REVENUE)))
            )
            .filter((unused, customerHistoryOptional) -> customerHistoryOptional.isPresent())
            .mapValues(customerHistoryOptional -> eventCreator.createRevenueIncreaseEvent(customerHistoryOptional.get()))
}

并在@Configuration中我添加了

@Bean
public Function<KStream<Long, Event>, KStream<Long, Event>> processPurchase() {
    return purchaseEventsProcessor.processPurchase();
}

如果在 eventCreator.createRevenueIncreaseEvent 期间抛出一些异常,则不会生成新消息 - 但数据库记录不会回滚。 @Transactional 注释是否有可能在这种方法中起作用?或者它在创建过程中只执行一次,并且以后永远不会调用代理。

此外,如果在处理消息期间发生某些异常,事务将不会回滚。

transactions spring-kafka spring-cloud-stream kafka-streams-binder
1个回答
0
投票

不支持使用您在 Kafka Streams 函数中描述的

@Transactional
。正如您所说,该方法在创建过程中仅调用一次,其余的直接委托给 Kafka Streams。但是,您可以将与数据库交互的代码提取到一个单独的方法中,然后用
@Transactional
标记它,然后提供一个数据库事务管理器,例如 JPA 管理器。当此隔离事务方法引发异常时,数据库事务将回滚,并且异常将传播到 Kafka Streams 处理器。这个博客有一些关于这个特定用例的更多细节。

© www.soinside.com 2019 - 2024. All rights reserved.