我有简单的 Outlook 邮件接收器,带有轮询器和任务执行器。 每 25 秒轮询一次固定延迟 每次轮询的最大消息数 2 最大获取大小 1
邮箱有8条信息
第一次轮询需要 2 条消息,并在线程 1 中逐条获取。此处理总共在 10 秒内完成
25 秒后,发生第二次轮询,再次发送 2 条消息,并进行与第一次轮询类似的处理。在这里我看到了线程 2。
其他消息的处理方式类似。
我认为在基于数字消息的第一次调用中,将产生多个线程(基于核心大小),并且每个线程将同时轮询。
但我没有看到并行性。那么poller上的任务执行器有什么用呢?
对于我的配置,我可以使用最大消息来轮询并获取大小(如果在下一次轮询 25 秒之前完成)。我说得对吗?
如果一次轮询中的消息处理时间超过 25 秒,第二次轮询将开始或将等待第一次轮询完成,会发生什么情况?
里面的逻辑是这样的:
private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (this.maxMessagesPerPoll == 0) {
logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
break;
}
if (pollForMessage() == null) {
break;
}
count++;
}
});
}
轮询器上的任务执行器的目的是在单独的线程上执行轮询任务,并且它确实对所有
maxMessagesPerPoll
执行此操作。
目标是将调度与处理分开。这样我们就可以让
TaskScheduler
尽可能摆脱“艰苦”的工作。
如果您想并行消息处理,请查看
ExecutorChannel
。这样,您就不需要在轮询器上使用 taskExecutor
,因为我们刚刚讨论的循环将非常“轻”,并且会将工作转移到提到的通道。
独立于执行器配置,处理消息多长时间并不重要:我们只是循环并离开预定的线程。因此,下一个轮询任务将在上一个轮询任务完成 25 秒后执行。