Spring Integration - 获取入站网关响应的警告日志,然后继续在后台进行异步处理

问题描述 投票:0回答:1

描述:

我们有一个 Spring 集成流程,我们希望配置一个端点,分割有效负载,并行执行一些转换,然后在经过一些验证后,一部分返回给消费者,即使用 http 代码 202,而其他项目必须继续异步处理。

配置说明:

我们有一个

<int-http:inbound-gateway>
。我们拆分
<int:splitter>
传入的有效负载以并行执行一些转换
<int:dispatcher task-executor="taskExecutor"/>
,然后聚合
<int:aggregator>
来分析状态。

如果错误,我们返回错误状态,如果没有,我们再次分裂。

<int:header-value-router
带输入通道:
mainItemRouter-channel
MAIN
消息将回复接收者线程,以便 API 的使用者得到 202。 即使消费者已经收到 202,
ITEM
消息也需要异步处理。

据我们所知,配置有效,但对于我们处理的每条异步消息,我们都有一个警告日志:

"level":"WARN","loggerName":"org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel","message":"Reply message received but the receiving thread has already received a reply: ... "

为什么我们会收到此警告?

配置错误吗?

我们在配置上遗漏了什么吗?

这是配置:

<beans>
  
    <int:channel id="postInboundReply-channel"/>
    <int:channel id="postAggregate-channel"/>
    
    <int:channel id="postFinish-channel">
    <int:header-enricher input-channel="postFinish-channel" output-channel="postInboundReply-channel">
        <int:header name="myHeader" overwrite="true"
                    expression="setMyHeader"/>
    </int:header-enricher>


    <!-- Endpoint -->
    <int-http:inbound-gateway id="postBulkInbound-gateway"
                              request-channel="postInboundRequest-channel"
                              reply-channel="postInboundReply-channel"
                              supported-methods="POST"
                              path="/api/tests"
                              request-payload-type="java.lang.String"
                              mapped-request-headers="content-type,Authorization"
                              mapped-response-headers="Content-Type,Location,myHeader"
                              reply-timeout="100000">
        <int-http:request-mapping consumes="application/json" produces="application/json"/>
    </int-http:inbound-gateway>

    <!-- split payload -->
    <int:splitter id='splitter' ref='splitterObject' method='split'
                  input-channel='postInboundRequest-channel' output-channel='mainRouter-channel'/>

    <!-- MAIN goes directly to aggregator, ERROR goes to the end -->
    <int:header-value-router input-channel="mainRouter-channel"  default-output-channel="executor-channel" header-name="myType">
        <int:mapping value="MAIN" channel="postAggregate-channel"/>
        <int:mapping value="ITEM" channel="executor-channel"/>
        <int:mapping value="ERROR" channel="postFinish-channel"/>
    </int:header-value-router>

    <!-- execute items in parallel -->
    <task:executor id="taskExecutor" pool-size="10" rejection-policy="DISCARD"/>
    <int:channel id="executor-channel">
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>

    <int:transformer input-channel="executor-channel"
                     output-channel="postAggregate-channel"
                     ref="aTransformationBean"/>

    <!-- Checkpoint: items could be transformed -->
    <int:aggregator
      input-channel="postAggregate-channel"
      ref="transformationAggregator"
      method="aggregate"
      output-channel="afterAggregation-channel"
      release-lock-before-send="true"
    />

    <!-- before we need to check if one has transform error if yes return 400 else continue -->
    <int:header-value-router resolution-required="false" input-channel="afterAggregation-channel" default-output-channel="afterAggregationNonError-channel" header-name="myType">
        <int:mapping value="ERROR" channel="afterAggregationWithError-channel"/>
    </int:header-value-router>

    <int:transformer input-channel="afterAggregationWithError-channel"
                output-channel="postFinish-channel"
                ref="errorTransformer"/>

    <!-- split for further processing -->
    <int:splitter input-channel='afterAggregationNonError-channel' ref='processingSplitter' output-channel='mainItemRouter-channel'/>

    <!-- MAIN goes back to the inbound caller thread, ITEM processing asynchronouns -->
    <int:header-value-router input-channel="mainItemRouter-channel"  header-name="myType">
        <int:mapping value="MAIN" channel="mainReply-channel"/>
        <int:mapping value="ITEM" channel="asyncProcessingGatewayChain-channel"/>
    </int:header-value-router>

    <int:transformer input-channel="mainReply-channel"
                     output-channel="postFinish-channel"
                     ref="responderBean"/>

    <!-- execute parallel -->
    <task:executor id="processingExecutor" pool-size="10" rejection-policy="DISCARD" />
    <int:channel id="asyncProcessingGatewayChain-channel">
        <int:dispatcher task-executor="processingExecutor"/>
    </int:channel>

    <!-- wrap the whole async  process in a chain in order to catch exceptions in the error-channel -->
    <int:chain input-channel="asyncProcessingGatewayChain-channel">
      <int:gateway id="asyncProcessingGateway" request-channel="asyncProcessing-channel" error-channel="asyncProcessingError-channel"/>
    </int:chain>

    <int:chain input-channel="asyncProcessing-channel">
        <int:transformer ref="asyncTransformerBean"/>
        <int:transformer ref="asyncSendJmsMessageBean"/>
        <int:transformer ref="asyncStoreStateBean"/>
    </int:chain>
    
    <int:transformer input-channel="asyncProcessingError-channel"
                     ref="asyncProcessingExceptionHandler"/>
</beans>
java spring multithreading asynchronous spring-integration
1个回答
0
投票

一个网关对于每个请求只能收到一个回复;因此出现警告日志;您的异步流程也会返回回复。

将异步流(链)的

output-channel
发送到
nullChannel

或者,使最后一个组件 (

asyncStoreStateBean
) 成为具有
service-activator
返回的
void
- 这也将阻止回复返回到网关。变压器必须返回一些东西。

© www.soinside.com 2019 - 2024. All rights reserved.