限制在ActiveMq的主题中向其传递消息的最大订户数

问题描述 投票:0回答:1

假设有一个ActiveMq实例,有100个客户端可以收听主题。当在主题中发布新消息时,是否可以限制接收它的订户数量(例如仅10个)?

如果现在其他Messaging Component能够做到这一点,或者有一个解决方案/整合的最佳实践?

jms activemq message-queue jms-topic
1个回答
0
投票

它不是可配置的开箱即用,但你可以创建一个插件来做这样的事情非常简单。

一个插件拦截addConsumer并抛出一些SecurityException如果已经有太多订阅者。它也可以从activemq.xml配置中进行配置。

请注意,这是一个快速而肮脏的示例,您可能需要对其进行增强。

max subscribers plugin.Java

import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;

import java.util.Map;

public class MaxSubscribersPlugin extends BrokerPluginSupport{

    private int maxSubscribers = 1000;

    @Override
    public Subscription addConsumer(ConnectionContext ctx, ConsumerInfo info) throws Exception {
        ActiveMQDestination dest = info.getDestination();
        if ( dest.isTopic()  && !dest.getQualifiedName().contains("Advisory")) { // TODO better way to filter out Advisories
            Map<ActiveMQDestination, Destination> destinations =  ctx.getBroker().getBrokerService()
                                                                    .getAdminView().getBroker().getDestinationMap();
            Destination activeTopic = destinations.get(info.getDestination());
            if( activeTopic != null && activeTopic.getConsumers().size() >= getMaxSubscribers() ) {

                throw new SecurityException("Too many active subscribers on topic "
                        + info.getDestination().getPhysicalName()
                        + ". Max is " + getMaxSubscribers());
            }
        }

        return super.addConsumer(ctx, info);
    }

    public int getMaxSubscribers() {
        return maxSubscribers;
    }

    public void setMaxSubscribers(int maxSubscribers) {
        this.maxSubscribers = maxSubscribers;
    }

}

将其打包在.jar中并将其放入ActiveMQ的lib文件夹中。然后你可以配置它。

    <plugins>
          <bean xmlns="http://www.springframework.org/schema/beans" id="throttler"
              class="my.package.MaxSubscribersPlugin">
            <property name="maxSubscribers" value="10"/>
        </bean>
    </plugins>

然后,如果有太多的消费者联系 - 他们会得到一个例外:javax.jms.JMSSecurityException: Too many active subscribers on topic topicX. Max is 10

ActiveMQ日志中也会有一个日志条目:

WARN | Security Error occurred on connection to: tcp://127.0.0.1:52335, Too many active subscribers on topic topicX. Max is 10

© www.soinside.com 2019 - 2024. All rights reserved.