动态队列和侦听器,消息未发送?

问题描述 投票:0回答:1

兔子配置:

package com.rabbitMQ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;
import java.net.URISyntaxException;

@EnableRabbit
@Configuration
public class RabbitMqConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

    @Value("${spring.rabbitmq.addresses}")
    private String addressURL;


    @Bean
    public ConnectionFactory connectionFactory() throws URISyntaxException {
        return new CachingConnectionFactory(new URI(addressURL));
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() throws URISyntaxException {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() throws URISyntaxException {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


}

应用程序概述:每当gitRepository连接到我们的应用程序时,存储库名称都将成为交换名称,在这种情况下为ForceCI,则该存储库中的每个分支都会创建自己的队列,这里有两个队列developmaster。现在,每次在开发分支中创建拉取请求时,我都需要传递信息以开发队列,并且应该由特定的侦听器侦听该侦听器,该侦听器仅应为开发注册。我看到了有关动态队列的示例,但似乎无法找到有关如何创建将在不同线程中执行的动态侦听器的任何示例,如何实现呢?另外,我正在尝试发送一些消息作为测试排队,但是我无法在控制台中看到它们。 (下面的代码)

enter image description here

@RequestMapping(value = "/createExchange", method = RequestMethod.GET)
public void createExchange(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    rabbitMqConfig.amqpAdmin().declareExchange(new DirectExchange("ForceCI"));

}

@RequestMapping(value = "/createDynamicQueues", method = RequestMethod.GET)
public void createDynamicQueues(@RequestParam String branchName, ServletResponse response, ServletRequest
        request) throws URISyntaxException {
    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties(branchName);

    System.out.println("develop -> "+develop);
    if(develop != null && develop.stringPropertyNames() != null && !develop.stringPropertyNames().isEmpty()) {
        for (String stringPropertyName : develop.stringPropertyNames()) {
            String property = develop.getProperty(stringPropertyName);
            System.out.println("property Value -> " + property + " ---- " + "property key -> " + stringPropertyName);
        }
    } else {
                    Queue queue = new Queue(branchName, true);
        String develop1 = rabbitMqConfig.amqpAdmin().declareQueue(new Queue(branchName, true));
        rabbitMqConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange("ForceCI")).withQueueName());
        System.out.println(develop1);
    }
}

@RequestMapping(value = "/sendMessageToQueuesDevelop", method = RequestMethod.GET)
public void sendMessageToQueuesDevelop(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("develop");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage");


}

@RequestMapping(value = "/sendMessageToQueuesMaster", method = RequestMethod.GET)
public void sendMessageToQueuesMaster(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("master");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage1");


}

UPDATE

[绑定丢失,当我按上面的代码所示进行绑定时,消息开始进入,但是我仍然无法弄清楚如何在不同的侦听器中侦听这些消息并在不同的线程中处理它们?

java rabbitmq amqp spring-rabbitmq
1个回答
0
投票

最简单的方法是使用DirectMessageListenerContainer并根据需要向其中添加队列。但是,您不会为每个队列获得一个新线程;使用直接容器,将在amqp-client线程池中的线程上调用侦听器。

直接容器在添加队列方面很有效;您可以根据需要从零队列开始。有关更多信息,请参见Choosing a container

如果每个队列必须有一个新线程,则必须为每个队列手动创建(和管理)SimpleMessageListenerContainer

© www.soinside.com 2019 - 2024. All rights reserved.