如果使用延迟,Apache camel没有将所有的消息传送到AMQ。

问题描述 投票:0回答:1

我试图使用apache camel从activemq队列 "AMQ:ORIGIN "中读取消息。读取消息后,需要将其传递给两个不同的 "AMQ队列"。但是条件如下。

  1. 消息应该立即传到 "AMQ:A "队列。
  2. 延迟一分钟后,消息应该传到 "AMQ:B "队列。

为了实现上述目标,我创建了两条路由。在第一条路由中,我从AMQ队列中读取消息,并向 "AMQ:A "和 "seda:delay "队列进行组播。在第二条路由中,我从 "seda:delay "队列中读取信息,延迟一分钟,然后传递到 "AMQ:B "队列。

如果传递1或10个消息到 "AMQ:ORIGIN "队列,工作正常。

如果我同时发送100条信息到 "AMQ:ORIGIN "队列,那么

  1. 所有100条信息都会被送到 "AMQ:A "队列中。
  2. 只有10或12条信息被送到 "AMQ:B "队列中。其余的只停留在路由中。

以下是我的路由。

        <route id="read-origin">
            <from uri="activemq:ORIGIN"/>
            <multicast stopOnException="true">
                <to uri="activemq:A"/>
                <to uri="seda:delay-route"/>
            </multicast>
        </route>

        <route id="delay-route">
            <from uri="seda:delay-route"/>
            <delay asyncDelayed="true">
                <constant>60000</constant>
            </delay>
            <to uri="activemq:B"/>
        </route>

请提出修改建议以达到上述目的。

谢谢。

apache-camel
1个回答
0
投票

这似乎是显而易见的,因为你延迟 每条信息1分钟.

如果你发送100条信息到 ORIGIN 队列,它需要 100分钟 直到所有这些信息到达队列 B.

第一条消息立即被消耗,并延迟1分钟。第二条消息在第一条消息送达时被接收(假设seda队列中有1个消费者),同样延迟1分钟,以此类推......

我假设你希望一条消息 已经排队等待了1分钟,消耗完后可以立即送达。.

你可以很容易地达到这一点,使延迟动态。

实现一个Bean,它可以计算出与 JMSTimestamp 消息的头(查询时间)和当前时间。

currentTime - JMSTimestamp =已经等待

Your minimal delay - alreadyWaited = 发送前的等待时间(对于排队时间超过延迟的消息,负值取0)

使用这个差值作为延迟的值(我使用Java DSL,因为我更了解它)。

from("seda:delay-route").routeId("delay-route")
    .delay().expression(method(YourDelayCalculationBean.class))
    .to("activemq:B");

像这样,如果你的消息堆积在队列中,它们可能都会在消费时立即送达,因为它们已经在队列中等待了1分钟以上。

由于评论而增加的内容

好吧,对不起,我没有发现 asyncDelayed.

医生是怎么说的 asyncDelayed 听起来像你所期望的。但根据你的评论,它听起来像 延迟EIP 不再阻挡消费者,而是阻挡自己。

所以seda消费者接收到一条消息后,将其交给Delay,然后继续下一条消息。10条消息后(10个线程是Camel默认的线程池大小),Delay就 "满了"(所有线程在固定的1分钟延迟中被阻塞)。

因此消费者成为阻塞,因为Delay不能再接受任何消息。1分钟后,Delay可以传递第一批消息,然后继续。

这只是根据你写的内容胡乱猜测你的路由是如何表现的。

© www.soinside.com 2019 - 2024. All rights reserved.