描述:
我们有一个 Spring 集成流程,我们希望配置一个端点,分割有效负载,返回给消费者,即使用 http 代码 202,其他项目必须继续异步处理。
配置说明:
我们有一个
<int-http:inbound-gateway>
。我们拆分 <int:splitter>
传入的有效负载。 splitter
始终创建一个列表,列表中的第一条消息是 MAIN
,然后接下来的将是 ITEM
消息。
在具有输入通道的
<int:header-value-router
中: mainItemRouter-channel
MAIN
消息将应答接收者线程,以便 API 的使用者得到 202。即使使用者已经收到,ITEM
消息也需要异步处理202.
问题:
如果我在拆分器之后有 25 条消息(1
MAIN
和 24 ITEM
)消息,则 taskExecutor
线程仅处理 10 条消息。这意味着每个线程仅处理一条消息,然后停止。
为什么
taskExecutor
线程不拾取其他 14 条消息来处理?
所有 24 条
ITEM
消息是否都到达执行者通道?
这是配置:
<beans>
<int:channel id="postInboundReply-channel"/>
<int:header-enricher input-channel="postFinish-channel" output-channel="postInboundReply-channel">
<int:header name="myHeader" overwrite="true"
expression="setMyHeader"/>
</int:header-enricher>
<!-- Endpoint -->
<int-http:inbound-gateway id="postInbound-gateway"
request-channel="postInboundRequest-channel"
reply-channel="postInboundReply-channel"
supported-methods="POST"
path="/api/tests"
request-payload-type="java.lang.String"
mapped-request-headers="content-type,Authorization"
mapped-response-headers="Content-Type,Location,myHeader"
>
<int-http:request-mapping consumes="application/json" produces="application/json"/>
</int-http:inbound-gateway>
<!-- split payload -->
<int:splitter id='splitter' ref='splitterObject' method='split'
input-channel='postInboundRequest-channel' output-channel='mainRouter-channel'/>
<!-- MAIN gives the response to the http request, ITEM must continue asychronously -->
<int:header-value-router input-channel="mainRouter-channel" default-output-channel="executor-channel" header-name="myType">
<int:mapping value="MAIN" channel="httpResponse-channel"/>
<int:mapping value="ITEM" channel="executor-channel"/>
<int:mapping value="ERROR" channel="postFinish-channel"/>
</int:header-value-router>
<int:transformer input-channel="httpResponse-channel"
output-channel="postFinish-channel"
ref="responderObject"/>
<!-- execute items in parallel -->
<task:executor id="taskExecutor" pool-size="10" rejection-policy="DISCARD"/>
<int:channel id="executor-channel">
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:transformer input-channel="executor-channel"
output-channel="afterTransformation1-channel"
ref="aTransformationBean"/>
<int:transformer input-channel="afterTransformation1-channel"
output-channel="afterTransformation2-channel"
ref="aTransformationBean2"/>
<!-- ITEM continues, ERROR has finished -->
<int:header-value-router input-channel="afterTransformation2-channel" default-output-channel="nullChannel" header-name="myType">
<int:mapping value="ITEM" channel="ItemAsyncPrcGatewayChain-channel"/>
<int:mapping value="ERROR" channel="nullChannel"/>
</int:header-value-router>
<!-- wrap the whole async process in a chain in order to catch exceptions in the error-channel -->
<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
<int:gateway id="itemAsyncPrcGateway" request-channel="itemAsyncProcess-channel" error-channel="asyncProcessError-channel"/>
</int:chain>
<int:transformer input-channel="asyncProcessError-channel"
ref="asyncExceptionHandler"/>
<int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">
<int:transformer ref="asyncTransformerBean"/>
<int:transformer ref="asyncSendJmsMessageBean"/>
<int:transformer ref="asyncStoreStateBean"/>
</int:chain>
</beans>
使用不同的配置运行多次测试但没有成功
好的。我看到你的问题了。
您在
<int:gateway id="itemAsyncPrcGateway">
中执行 <chain>
。这是关于请求-回复的。其合同基于:
@FunctionalInterface
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
但是,在该网关的子流程结束时,您会执行以下操作:
<int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">
因此您不会向该网关返回任何回复。这可能是您在每个项目流程结束时所期望的,但是中间的
<gateway>
是错误的。
如何摆脱它
<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
并将该项目直接路由到 itemAsyncProcess-channel
?
重点是网关正在等待回复,从而阻塞了它工作的线程。