spring 集成轮询器与调度程序

问题描述 投票:0回答:1

我正在尝试使用 spring 集成设置一个简单的应用程序。目标是简单地使用文件入站通道适配器来监视目录中的新文件并在添加文件时对其进行处理。为简单起见,目前处理文件只是记录一些输出(正在处理的文件的名称)。不过,我确实想以多线程方式处理文件。假设选取了 10 个文件,应该并行处理,一旦这些文件完成,我们才会继续处理接下来的 10 个文件。

为此,我尝试了两种不同的方法,两者的工作原理似乎相似,我想了解使用轮询器或调度程序进行此类操作之间的差异。

方法#1 - 使用轮询器

<int-file:inbound-channel-adapter id="filesIn" directory="in">
        <int:poller fixed-rate="1" task-executor="executor" />
</int-file:inbound-channel-adapter>
    
<int:service-activator ref="moveToStage" method="move" input-channel="filesIn" />

<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="DISCARD" />

因此,据我了解,我们的想法是,我们不断轮询目录,一旦收到文件,就会将其发送到 filesIn 通道,直到达到池限制。然后,直到池被占用为止,即使我假设轮询仍在后台继续,也不会发送其他文件。这似乎有效,但我不确定使用每次轮询的最大消息是否有助于降低轮询频率。通过将每个轮询的最大消息设置为接近池大小。

方法 #2 - 使用调度程序

<int-file:inbound-channel-adapter id="filesIn" directory="in">
    <int:poller fixed-rate="5000" max-messages-per-poll="3" />
</int-file:inbound-channel-adapter>

<int:bridge input-channel="filesIn" output-channel="filesReady" />

<int:channel id="filesReady">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:service-activator ref="moveToStage" method="move" input-channel="filesInReady" />

<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="CALLER_RUNS" />

好吧,这里轮询器没有使用执行器,所以我假设它以顺序方式进行轮询。每个 poll 3 文件都应该被拾取,然后发送到 filesReady 通道,然后使用调度程序将文件传递到服务激活器,并且因为它使用调度程序的执行器,所以它立即返回控制并允许 filesIn 通道发送更多文件。

我想我的问题是我是否正确理解这两种方法以及一种方法是否比另一种更好。

谢谢

spring-integration
1个回答
8
投票

是的,你的理解是正确的。

一般来说,我会说每毫秒轮询一次(并在队列已满时丢弃轮询)是对资源(CPU 和 I/O)的浪费。

此外,在第一种情况下增加每次轮询的最大消息数也无济于事,因为轮询是在执行程序线程上完成的(调度程序将轮询交给执行程序,该线程将处理

mmpp
)。

在第二种情况下,由于调度程序线程在轮询期间(而不是之前)进行切换,因此

mmpp
将按预期工作。

因此,一般来说,您的第二种实现是最好的(只要您可以忍受新文件到达时平均 2.5 秒的延迟)。

© www.soinside.com 2019 - 2024. All rights reserved.