入站通道适配器未根据执行程序缩放

问题描述 投票:0回答:1

我有jdbc:inbound-channel-adapter轮询50条记录。我试图通过将pollerExecutor池大小缩放到1-10来提高性能,以便多个线程可以处理50条记录:

    <int-jdbc:inbound-channel-adapter
    id="initial.ContactType.poller"
    query="${poller.ContactType.get}"
    max-rows="${poller.deliveryContactType.maxRow:50}"
    row-mapper="ContactTypePollerRowMapper"
    data-source="dataSource" channel="ContactTypeChannel">
    <int:poller  fixed-rate="3000" time-unit="MILLISECONDS" task-executor="pollerExecutor">
        <int:advice-chain>
            <ref bean="pollerLoggingAdvice"/>
            <ref bean="txAdvice"  />
        </int:advice-chain>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
<task:executor id="pollerExecutor" pool-size="1-10"
    queue-capacity="0" rejection-policy="CALLER_RUNS" />

我测试了处理100,000条记录所花费的时间是相同的,而不管池的大小如何。

我分别进行了三轮测试,其中pool-size = 1,pool-size = 1-3和pool-size = 1-10,在所有三个测试中,每次100,000条记录都花费了1个小时。

我通过检查日志确认pollerExecutor线程不能并行运行。在pollerExecutor-2开始处理之前,pollerExecutor-1将处理所有50条记录。

为什么容器/ pollerExecutor无法并行运行?

spring-integration threadpoolexecutor executor
1个回答
0
投票

我认为您的问题在这里:

/**
 * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
 * Default is {@code Integer.MAX_VALUE}.
 * <p>Any positive value will lead to a LinkedBlockingQueue instance;
 * any other value will lead to a SynchronousQueue instance.
 * @see java.util.concurrent.LinkedBlockingQueue
 * @see java.util.concurrent.SynchronousQueue
 */
public void setQueueCapacity(int queueCapacity) {

因此,如果您指定queue-capacity="0",则会得到SynchronousQueue,它不能接受新的并行任务,因为已经有一个忙于处理这50条记录。

尝试使用一些合理的queue-capacity来观察可能的并行性。

© www.soinside.com 2019 - 2024. All rights reserved.