spring-Integration - http 入站网关响应和异步处理 - 任务执行器线程仅处理一条消息而不是所有消息

问题描述 投票:0回答:1

描述:

我们有一个 Spring 集成流程,我们希望配置一个端点,分割有效负载,返回给消费者,即使用 http 代码 202,其他项目必须继续异步处理。

配置说明:

我们有一个

<int-http:inbound-gateway>
。我们拆分
<int:splitter>
传入的有效负载。
splitter
始终创建一个列表,列表中的第一条消息是
MAIN
,然后接下来的将是
ITEM
消息。

在具有输入通道的

<int:header-value-router
中:
mainItemRouter-channel
MAIN
消息将应答接收者线程,以便 API 的使用者得到 202。即使使用者已经收到,
ITEM
消息也需要异步处理202.

问题:

如果我在拆分器之后有 25 条消息(1

MAIN
和 24
ITEM
)消息,则
taskExecutor
线程仅处理 10 条消息。这意味着每个线程仅处理一条消息,然后停止。

为什么

taskExecutor
线程不拾取其他 14 条消息来处理?

所有 24 条

ITEM
消息是否都到达执行者通道?

这是配置:

<beans>
    <int:channel id="postInboundReply-channel"/>
    <int:header-enricher input-channel="postFinish-channel" output-channel="postInboundReply-channel">
        <int:header name="myHeader" overwrite="true"
                    expression="setMyHeader"/>
    </int:header-enricher>

    <!-- Endpoint -->
    <int-http:inbound-gateway id="postInbound-gateway"
                              request-channel="postInboundRequest-channel"
                              reply-channel="postInboundReply-channel"
                              supported-methods="POST"
                              path="/api/tests"
                              request-payload-type="java.lang.String"
                              mapped-request-headers="content-type,Authorization"
                              mapped-response-headers="Content-Type,Location,myHeader"
                              >
        <int-http:request-mapping consumes="application/json" produces="application/json"/>
    </int-http:inbound-gateway>


    <!-- split payload -->
    <int:splitter id='splitter' ref='splitterObject' method='split'
                  input-channel='postInboundRequest-channel' output-channel='mainRouter-channel'/>

    <!--  MAIN gives the response to the http request, ITEM must continue asychronously -->
    <int:header-value-router input-channel="mainRouter-channel"  default-output-channel="executor-channel" header-name="myType">
        <int:mapping value="MAIN" channel="httpResponse-channel"/>
        <int:mapping value="ITEM" channel="executor-channel"/>
        <int:mapping value="ERROR" channel="postFinish-channel"/>
    </int:header-value-router>

    <int:transformer input-channel="httpResponse-channel"
                     output-channel="postFinish-channel"
                     ref="responderObject"/>

    <!-- execute items in parallel -->
    <task:executor id="taskExecutor" pool-size="10" rejection-policy="DISCARD"/>
    <int:channel id="executor-channel">
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>

    <int:transformer input-channel="executor-channel"
                     output-channel="afterTransformation1-channel"
                     ref="aTransformationBean"/>

    <int:transformer input-channel="afterTransformation1-channel"
                     output-channel="afterTransformation2-channel"
                     ref="aTransformationBean2"/>

    <!-- ITEM continues, ERROR has finished -->
    <int:header-value-router input-channel="afterTransformation2-channel" default-output-channel="nullChannel" header-name="myType">
        <int:mapping value="ITEM" channel="ItemAsyncPrcGatewayChain-channel"/>
        <int:mapping value="ERROR" channel="nullChannel"/>
    </int:header-value-router>

    <!-- wrap the whole async  process in a chain in order to catch exceptions in the error-channel -->
    <int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
      <int:gateway id="itemAsyncPrcGateway" request-channel="itemAsyncProcess-channel" error-channel="asyncProcessError-channel"/>
    </int:chain>

    <int:transformer input-channel="asyncProcessError-channel"
                     ref="asyncExceptionHandler"/>

    <int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">
        <int:transformer ref="asyncTransformerBean"/>
        <int:transformer ref="asyncSendJmsMessageBean"/>
        <int:transformer ref="asyncStoreStateBean"/>
    </int:chain>

</beans>

使用不同的配置运行多次测试但没有成功

java spring multithreading asynchronous spring-integration
1个回答
0
投票

好的。我看到你的问题了。

您在

<int:gateway id="itemAsyncPrcGateway">
中执行
<chain>
。这是关于请求-回复的。其合同基于:

@FunctionalInterface
public interface RequestReplyExchanger {

    Message<?> exchange(Message<?> request) throws MessagingException;

}

但是,在该网关的子流程结束时,您会执行以下操作:

<int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">

因此您不会向该网关返回任何回复。这可能是您在每个项目流程结束时所期望的,但是中间的

<gateway>
是错误的。

如何摆脱它

<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
并将该项目直接路由到
itemAsyncProcess-channel

重点是网关正在等待回复,从而阻塞了它工作的线程。

© www.soinside.com 2019 - 2024. All rights reserved.