使用RabbitMQ插件在Grails中创建队列运行时

问题描述 投票:0回答:3

我有一个系统,外部系统可以订阅我的系统生成的事件。系统用Grails 2编写,使用RabbitMQ plugin进行内部消息传递。外部系统的事件通过HTTP传递。

我想为每个订户创建一个队列,以防止慢速订户端点减慢向其他订户的消息。订阅可以在运行时发生,这就是为什么在应用程序配置中定义队列是不可取的。

如何使用Grails RabbitMQ插件创建具有主题绑定运行时的队列?

由于从RabbitMQ队列读取消息直接耦合到服务,因此创建队列运行时的一个副问题可能是拥有该Grails服务的多个实例。有任何想法吗?

spring grails groovy rabbitmq amqp
3个回答
1
投票

我没有为你准备好的解决方案,但如果你遵循RabbitmqGrailsPlugin Descriptor中的代码,特别是doWithSpring部分你应该能够重新创建在运行时动态初始化新的Queue和相关的Listener所需的步骤。

这一切都归结为传递所需的参数,注册必要的spring bean并启动监听器。

为了回答你的第二个问题,我认为你可以提出一些命名约定并为每个队列创建一个新的队列处理程序。如何动态创建spring bean的示例可以在这里找到:dynamically declare beans

只是一个简短的例子,我如何快速注册一个队列,它需要更多的布线等...

def createQ(queueName) {
    def queuesConfig = {
        "${queueName}"(durable: true, autoDelete: false,)
    }
    def queueBuilder = new RabbitQueueBuilder()
    queuesConfig.delegate = queueBuilder
    queuesConfig.resolveStrategy = Closure.DELEGATE_FIRST
    queuesConfig()

    queueBuilder.queues?.each { queue ->
        if (log.debugEnabled) {
            log.debug "Registering queue '${queue.name}'"
        }
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(Queue.class);
        builder.addConstructorArgValue(queue.name)
        builder.addConstructorArgValue(Boolean.valueOf(queue.durable))
        builder.addConstructorArgValue(Boolean.valueOf(queue.exclusive))
        builder.addConstructorArgValue(Boolean.valueOf(queue.autoDelete))
        builder.addConstructorArgValue(queue.arguments)
        DefaultListableBeanFactory factory = (DefaultListableBeanFactory) grailsApplication.mainContext.getBeanFactory();
        factory.registerBeanDefinition("grails.rabbit.queue.${queue.name}", builder.getBeanDefinition());
    }
}

0
投票

我最终使用了Grails RabbitMQ插件使用的Spring AMQP。删除了一些方法/参数,因为它们与示例无关:

class MyUpdater {
  void handleMessage(Object message) {
    String content = new String(message)

    // do whatever you need with the message
  }
}


import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.amqp.rabbit.connection.ConnectionFactory

class ListenerInitiator {

  // autowired
  ConnectionFactory   rabbitMQConnectionFactory

  protected void initiateListener() {
    RabbitAdmin admin = new RabbitAdmin(rabbitMQConnectionFactory)

    // normally passed to this method, moved to local vars for simplicity
    String queueName = "myQueueName"
    String routingKey = "#"
    String exchange = "myExchange"

    Queue queue = new Queue(queueName)
    admin.declareQueue(queue)
    TopicExchange exchange = new TopicExchange(exchange)
    admin.declareExchange(exchange)

    admin.declareBinding( BindingBuilder.bind(queue).to(exchange).with(routingKey) )

    // normally passed to this method, moved to local var for simplicity
    MyUpdater listener = new MyUpdater()
    SimpleMessageListenerContainer container =
        new SimpleMessageListenerContainer(rabbitMQConnectionFactory)
    MessageListenerAdapter adapter = new MessageListenerAdapter(listener)
    adapter.setMessageConverter(new SimpleMessageConverter())

    container.setMessageListener(adapter)
    container.setQueueNames(queueName)
    container.start()
}

-1
投票

我强烈建议检查一下(https://www.infoq.com/presentations/api-io-state)。我为Grails 3.1提交了一个API插件,以便从控制器共享IO状态和抽象API功能,这样它就可以与分布式架构中的服务共享,但Grails团队似乎希望对社区应用不同的规则。 。

它会解决你的问题,但它可能永远不会被释放。

© www.soinside.com 2019 - 2024. All rights reserved.