我使用activemq(5.14.5)
和camel(2.13.4)
,因为我仍然需要java 6
。我有一个队列和15个消费者。发送给他们的消息是请求回复。
当我启动消费者时,消息一旦消息到达就会按消费者分发,但是一段时间后,只有一个消费者收到消息,其他消息保持闲置状态,许多消息保持待定状态。
消费者有这样的配置:
concurrentConsumers=15&maxMessagesPerTask=1&destination.consumer.prefetchSize=0&transferException=true
由于我们的业务规则,处理每条消息所花费的时间可能会有很大差异,因此,我不知道activemq
是否有一些规则可以管理缓慢的消费者并重定向到只有一个更“高效”的消费者。
我期待的行为是所有到达的消息,开始处理直到所有消费者都满了,但事实并非如此。
谁知道发生了什么?
您的配置有两个引人注目的设置:
maxMessagesPerTask=1
如果您不打算配置自动缩放线程池,则应完全删除此设置。默认情况下是无限制的,它设置保持线程处理的时间(向上/向下扩展线程池)。
另见Spring Docs about this setting
prefetchSize=0
您是否尝试将此设置为1,以便每个消费者一次只能获得1条消息?
AMQ docs说prefetchSize:
建议使用较大的预取值以实现高消息量的高性能。但是,对于较低的消息量,每个消息需要很长时间才能处理,预取应设置为1.这可确保消费者一次只处理一条消息。但是,将预取限制指定为零将导致使用者一次一个地轮询消息,而不是将消息推送到使用者。