我试图使用apache camel从activemq队列 "AMQ:ORIGIN "中读取消息。读取消息后,需要将其传递给两个不同的 "AMQ队列"。但是条件如下。
为了实现上述目标,我创建了两条路由。在第一条路由中,我从AMQ队列中读取消息,并向 "AMQ:A "和 "seda:delay "队列进行组播。在第二条路由中,我从 "seda:delay "队列中读取信息,延迟一分钟,然后传递到 "AMQ:B "队列。
如果传递1或10个消息到 "AMQ:ORIGIN "队列,工作正常。
如果我同时发送100条信息到 "AMQ:ORIGIN "队列,那么
以下是我的路由。
<route id="read-origin">
<from uri="activemq:ORIGIN"/>
<multicast stopOnException="true">
<to uri="activemq:A"/>
<to uri="seda:delay-route"/>
</multicast>
</route>
<route id="delay-route">
<from uri="seda:delay-route"/>
<delay asyncDelayed="true">
<constant>60000</constant>
</delay>
<to uri="activemq:B"/>
</route>
请提出修改建议以达到上述目的。
谢谢。
这似乎是显而易见的,因为你延迟 每条信息1分钟.
如果你发送100条信息到 ORIGIN
队列,它需要 100分钟 直到所有这些信息到达队列 B
.
第一条消息立即被消耗,并延迟1分钟。第二条消息在第一条消息送达时被接收(假设seda队列中有1个消费者),同样延迟1分钟,以此类推......
我假设你希望一条消息 已经排队等待了1分钟,消耗完后可以立即送达。.
你可以很容易地达到这一点,使延迟动态。
实现一个Bean,它可以计算出与 JMSTimestamp
消息的头(查询时间)和当前时间。
currentTime - JMSTimestamp
=已经等待
Your minimal delay - alreadyWaited
= 发送前的等待时间(对于排队时间超过延迟的消息,负值取0)
使用这个差值作为延迟的值(我使用Java DSL,因为我更了解它)。
from("seda:delay-route").routeId("delay-route")
.delay().expression(method(YourDelayCalculationBean.class))
.to("activemq:B");
像这样,如果你的消息堆积在队列中,它们可能都会在消费时立即送达,因为它们已经在队列中等待了1分钟以上。
由于评论而增加的内容
好吧,对不起,我没有发现 asyncDelayed
.
医生是怎么说的 asyncDelayed
听起来像你所期望的。但根据你的评论,它听起来像 延迟EIP 不再阻挡消费者,而是阻挡自己。
所以seda消费者接收到一条消息后,将其交给Delay,然后继续下一条消息。10条消息后(10个线程是Camel默认的线程池大小),Delay就 "满了"(所有线程在固定的1分钟延迟中被阻塞)。
因此消费者成为阻塞,因为Delay不能再接受任何消息。1分钟后,Delay可以传递第一批消息,然后继续。
这只是根据你写的内容胡乱猜测你的路由是如何表现的。