描述:
我们有一个 Spring 集成流程,我们希望配置一个端点,分割有效负载,并行执行一些转换,然后在经过一些验证后,一部分返回给消费者,即使用 http 代码 202,而其他项目必须继续异步处理。
配置说明:
我们有一个
<int-http:inbound-gateway>
。我们拆分 <int:splitter>
传入的有效负载以并行执行一些转换 <int:dispatcher task-executor="taskExecutor"/>
,然后聚合 <int:aggregator>
来分析状态。
如果错误,我们返回错误状态,如果没有,我们再次分裂。
在
<int:header-value-router
带输入通道: mainItemRouter-channel
MAIN
消息将回复接收者线程,以便 API 的使用者得到 202。
即使消费者已经收到 202,ITEM
消息也需要异步处理。
据我们所知,配置有效,但对于我们处理的每条异步消息,我们都有一个警告日志:
"level":"WARN","loggerName":"org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel","message":"Reply message received but the receiving thread has already received a reply: ... "
:
为什么我们会收到此警告?
配置错误吗?
我们在配置上遗漏了什么吗?
这是配置:
<beans>
<int:channel id="postInboundReply-channel"/>
<int:channel id="postAggregate-channel"/>
<int:channel id="postFinish-channel">
<int:header-enricher input-channel="postFinish-channel" output-channel="postInboundReply-channel">
<int:header name="myHeader" overwrite="true"
expression="setMyHeader"/>
</int:header-enricher>
<!-- Endpoint -->
<int-http:inbound-gateway id="postBulkInbound-gateway"
request-channel="postInboundRequest-channel"
reply-channel="postInboundReply-channel"
supported-methods="POST"
path="/api/tests"
request-payload-type="java.lang.String"
mapped-request-headers="content-type,Authorization"
mapped-response-headers="Content-Type,Location,myHeader"
reply-timeout="100000">
<int-http:request-mapping consumes="application/json" produces="application/json"/>
</int-http:inbound-gateway>
<!-- split payload -->
<int:splitter id='splitter' ref='splitterObject' method='split'
input-channel='postInboundRequest-channel' output-channel='mainRouter-channel'/>
<!-- MAIN goes directly to aggregator, ERROR goes to the end -->
<int:header-value-router input-channel="mainRouter-channel" default-output-channel="executor-channel" header-name="myType">
<int:mapping value="MAIN" channel="postAggregate-channel"/>
<int:mapping value="ITEM" channel="executor-channel"/>
<int:mapping value="ERROR" channel="postFinish-channel"/>
</int:header-value-router>
<!-- execute items in parallel -->
<task:executor id="taskExecutor" pool-size="10" rejection-policy="DISCARD"/>
<int:channel id="executor-channel">
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:transformer input-channel="executor-channel"
output-channel="postAggregate-channel"
ref="aTransformationBean"/>
<!-- Checkpoint: items could be transformed -->
<int:aggregator
input-channel="postAggregate-channel"
ref="transformationAggregator"
method="aggregate"
output-channel="afterAggregation-channel"
release-lock-before-send="true"
/>
<!-- before we need to check if one has transform error if yes return 400 else continue -->
<int:header-value-router resolution-required="false" input-channel="afterAggregation-channel" default-output-channel="afterAggregationNonError-channel" header-name="myType">
<int:mapping value="ERROR" channel="afterAggregationWithError-channel"/>
</int:header-value-router>
<int:transformer input-channel="afterAggregationWithError-channel"
output-channel="postFinish-channel"
ref="errorTransformer"/>
<!-- split for further processing -->
<int:splitter input-channel='afterAggregationNonError-channel' ref='processingSplitter' output-channel='mainItemRouter-channel'/>
<!-- MAIN goes back to the inbound caller thread, ITEM processing asynchronouns -->
<int:header-value-router input-channel="mainItemRouter-channel" header-name="myType">
<int:mapping value="MAIN" channel="mainReply-channel"/>
<int:mapping value="ITEM" channel="asyncProcessingGatewayChain-channel"/>
</int:header-value-router>
<int:transformer input-channel="mainReply-channel"
output-channel="postFinish-channel"
ref="responderBean"/>
<!-- execute parallel -->
<task:executor id="processingExecutor" pool-size="10" rejection-policy="DISCARD" />
<int:channel id="asyncProcessingGatewayChain-channel">
<int:dispatcher task-executor="processingExecutor"/>
</int:channel>
<!-- wrap the whole async process in a chain in order to catch exceptions in the error-channel -->
<int:chain input-channel="asyncProcessingGatewayChain-channel">
<int:gateway id="asyncProcessingGateway" request-channel="asyncProcessing-channel" error-channel="asyncProcessingError-channel"/>
</int:chain>
<int:chain input-channel="asyncProcessing-channel">
<int:transformer ref="asyncTransformerBean"/>
<int:transformer ref="asyncSendJmsMessageBean"/>
<int:transformer ref="asyncStoreStateBean"/>
</int:chain>
<int:transformer input-channel="asyncProcessingError-channel"
ref="asyncProcessingExceptionHandler"/>
</beans>
一个网关对于每个请求只能收到一个回复;因此出现警告日志;您的异步流程也会返回回复。
将异步流(链)的
output-channel
发送到 nullChannel
。
或者,使最后一个组件 (
asyncStoreStateBean
) 成为具有 service-activator
返回的 void
- 这也将阻止回复返回到网关。变压器必须返回一些东西。