使用 AMQP 建立与事件中心的连接时如何设置 x-opt-offset 以避免消息重播

问题描述 投票:0回答:3

我的应用程序连接到azure事件中心以接收消息并处理它们。我发现每次重新启动应用程序时,保留期内的所有消息都会重播。我阅读了有关偏移量的信息以避免此问题,并且我有一种方法可以将与天蓝色事件中心的连接设置为:

    MessageConsumer connect() {
        // set up JNDI context
        BatchEventHubConfig batchEventHubConfig = //MAP CONTAINING CONFIG
        String queueName = "EventHub"
        String connectionFactoryName = "SBCF"
        //Long offset = batchAccountManager.batchStorageManager.batchJobMsgCheckpointService.get(batchEventHubConfig.namespace, batchEventHubConfig.getMessageQueueAddress(partitionInx, true))?.offset
        Hashtable<String, String> hashtable = new Hashtable<>()
        hashtable.put("connectionfactory.${connectionFactoryName}", batchEventHubConfig.getAMQPConnectionURI())
        hashtable.put("queue.${queueName}", batchEventHubConfig.getMessageQueueAddress(partitionInx))
        //hashtable.put("apache.org:selector-filter:string", "amqp.annotation.x-opt-offset > '${offset}'")
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory")
        Context context = new InitialContext(hashtable)

        ConnectionFactory factory = (ConnectionFactory) context.lookup(connectionFactoryName)
        queue = (Destination) context.lookup(queueName)
        connection = factory.createConnection(batchEventHubConfig.sasPolicyName, batchEventHubConfig.sasPolicyKey)
        connection.setExceptionListener(new BatchExceptionListener(eventHubConnection: this))

        connection.start()
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
        messageConsumer = session.createConsumer(queue)
        messageConsumer.setMessageListener(messageListener)
        messageConsumer
    }

注释掉的偏移代码是我在阅读完此处后尝试的代码:https://azure.github.io/amqpnetlite/articles/azure_eventhubs.html

设置偏移量以便应用程序重新启动时消息不会重新播放的正确方法是什么?

azure amqp azure-eventhub
3个回答
2
投票

Apche QPID 不支持 AMQP 过滤器(底层 Apache Proton J 支持)..

我通过在末尾添加以下行来修补 AmqpConsumerBuilder.configureSource() 方法:

Symbol filterKey = Symbol.valueOf("apache.org:selector-filter:string");
UnknownDescribedType filterValue = new UnknownDescribedType(filterKey, String.format("%s > '%s'",amqp.annotation.x-opt-offset", lastOffset));

filters.put(filterKey, filterValue);

并且有效!

因此,要么创建 Apache QPID 的分支并应用此补丁,要么将修改后的类放入类路径中以覆盖原始类(非常糟糕的解决方案)


0
投票

这比我想象的要容易得多!找到这个链接:https://timjansen.github.io/jarfiller/guide/jms/selectors.xhtml

所以我所要做的就是添加这样的过滤条件:

messageConsumer = session.createConsumer(queue, "amqp.annotation.x-opt-offset >= '${messageOffset}'")

0
投票

受到 Mourad Zouabi 答案的启发,我创建了 qpid-jms 存储库的一个分支,并按如下方式更改了 AmqpConsumerBuilder 类,以允许将过滤器传递给 MessageConsumer:

   if (resourceInfo.getSelector() != null && !resourceInfo.getSelector().trim().equals("")) {
        if (resourceInfo.getSelector().startsWith("x-opt-offset") || resourceInfo.getSelector().startsWith("x-opt-enqueued-time")) {
            // support Azure Event HUB filters
            // see: https://azure.github.io/amqpnetlite/articles/azure_eventhubs.html
            Symbol filterKey = Symbol.valueOf("apache.org:selector-filter:string");
            UnknownDescribedType filterValue = new UnknownDescribedType(filterKey,"amqp.annotation." + resourceInfo.getSelector().trim());
            filters.put(filterKey, filterValue);
        } else {
            filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resourceInfo.getSelector()));
        }
    }
    if (!filters.isEmpty()) {
        source.setFilter(filters);
    }

现在创建 MessageConsumer 时可以传递过滤器:

MessageConsumer consumer = session.createConsumer(queue,  String.format("x-opt-offset > %s", offset));

MessageConsumer consumer = session.createConsumer(queue,  String.format("x-opt-enqueued-time > %s", timeStamp));

请参阅 https://github.com/ekkelenkamp/qpid-jms 了解工作叉。

© www.soinside.com 2019 - 2024. All rights reserved.