我对 Spring 集成以及下面所述的几乎所有内容都是新手。 我有一个 inboundChannelAdapter 来从 pubsub 中的订阅中提取消息。 事实上,一旦订阅者(即)收到消息,它就会从 pubsub 中提取消息,并将其交给 @serviceactivator 方法。 我想将它包装在一个 cron 中,以便消息订阅仅在该时间间隔内发生(期望它会像使用 @Scheduled 的一般任务 cron 作业一样运行)。很自然地,我偶然发现了 poller。 我在这里面临一些问题并且超出了我的深度:
轮询频率没有按照指定的方式工作,无论是固定速率还是 cron 触发器 [我尝试使用固定速率 60000 1 分钟,并使用返回 cron 表达式的 bean 触发 =“cronTrigger”(* * * * * *)]
一旦应用程序启动,它就会开始拉取消息,而不是等待预定/固定时间。
3.我还有与此无关的@Scheduled cron 作业将要运行。所以我也不想遇到任何线程阻塞问题。
我还注意到,在拉出 1 条消息并将其移交给 serviceactivator 方法后,它会完成整个动作并完成该过程,然后收到下一条消息,这对于我的用例来说似乎并不理想。
对于我规定的用例使用轮询器是一个好习惯吗?
请建议应该采取什么措施来实现使用轮询器按计划频率通过通道适配器从 pubsub 中提取消息的行为。
我没有使用 IntegrationFlow,并且我尝试了如下在通道适配器 bean 上注释的操作;
@InboundChannelAdapter(channel="inboundchannel", poller = @Poller(fixedRate="60000", maxMessagesPerPoll="10")
它确实在应用程序启动后立即启动第一个轮询周期。只是没有任何先前的任务来计算当前的延迟。为此,
PeriodicTrigger
具有 initialDelay
属性。我们不会在 @Poller
注释中公开它,因为它是特定于触发器的,并且不适用于 CronTrigger
等其他触发器。您可能会考虑查看 fixedDelay
而不是 fixedRate
。有关这些属性的更多信息,请参阅 PeriodicTrigger
Javadocs。简而言之:fixedRate
在开始前一个任务后开始新任务,fixedDelay
仅在前一个任务完成后开始。
您有
maxMessagesPerPoll="10"
,这意味着在单个轮询周期中循环最多 10 条消息。只有在那之后,它才会进入下一个计划任务。
如果您想并行处理这些轮询消息,请考虑设置该
taskExecutor
注释的 @Poller
选项。
在文档中查看更多信息:
https://docs.spring.io/spring-integration/reference/polling-consumer.html