Spring Integration流程调用REST服务

问题描述 投票:0回答:1

我在我的项目中定义了以下集成流程

///

public IntegrationFlow acarsEventFlow() {
    return IntegrationFlows
            //.from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //Get Message from MQ
            .from(org.springframework.integration.jms.dsl.Jms.messageDrivenChannelAdapter(
                    org.springframework.integration.jms.dsl.Jms.container(this.acarsMqConnectionFactory, this.acarsQueue)
                    .transactionManager(transactionManager(this.acarsMqConnectionFactory))
                    .get()))
            .wireTap(ACARS_WIRE_TAP_CHNL) 
            .transform(agmTransformer, "parseXMLMessage") //
            .handle(acarsProcessor, "pushRawMessage") // (1)Call web service to push the message payload and if it fails then don't commit the transaction and rollback the message
            .transform(agmTransformer, "populateSmi") 
            .filter(acarsFilter,"filterMessageOnSmi") // 
            .transform(agmTransformer, "populateImi") //
            .filter(acarsFilter,"filterMessageOnSmiImi") //
            .transform(acarsProcessor,"processEvent") //
            .publishSubscribeChannel(pubSub -> pubSub
                    .subscribe(flow -> flow
                        .bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
                        .enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNbr")) //Add flight number as key
                        .transform("payload.message") // publish the transformed message
                        .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(acarsKafkaTopic))) //publish to kafka
                    .subscribe(flow -> flow
                        .channel(UPDATE_DATA_STORE_CHNL))) 
            .get(); 

}

///

我从MQ获取消息,启动事务管理器以确保消息被回滚,除非它被处理。现在在其中一个句柄方法#pushRawMessage()[请参阅注释(1)调用Web服务以在上面的代码片段中推送消息有效负载]我需要调用webservice。目前我只是从处理程序内部调用web服务 - pushRawMessage()。引入Messaging Gateway来调用第三方Web服务是个好主意吗?如果我们引入一个mEssaging Gateway,那么当webservice关闭时,我们如何确保原始消息被回滚?

spring-integration spring-integration-dsl
1个回答
0
投票

这样就可以了。还可以使用.gateway()为该Web服务进程执行一些子流程。只要所有内容都在同一个线程中完成,当您只使用直接通道时,一切都将参与同一个事务。因此,该子流中的任何错误都将导致回滚事务。

您也可以将该Web服务进程作为异步,只要您使用gateway()即可。它会在当前线程中等待回复或错误。因此,交易将再次回滚。

© www.soinside.com 2019 - 2024. All rights reserved.