Spring Boot 消费者类也应该是生产者/rabbitmq

问题描述 投票:0回答:1

我正在使用rabbitmq和springBoot。我有一个消费者当前正在侦听一个队列,并且应该根据收到的消息向另一个队列发送一条新消息。这就是我的问题。我希望你可以帮助我。我没有通过 Rest 发送消息,而是直接使用 ConvertAndSend 方法发送消息。消息的接收工作正常。

消费者:

import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.server.ConfigurableWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableRabbit
public class SpringConsumer {

    private static final String QUEUE_NAME = "spring-boot4";
    private static final String OTHER_QUEUE_NAME = "resonse_queue";

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumer.class, args);
    }


    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        try {
            JSONObject json = new JSONObject(message);
            if ((boolean) json.get("cool")) {
                System.out.println("Neue 'wahre' Nachricht empfangen: " + json.toString());
                JSONObject responseJson = new JSONObject();
                responseJson.put("response", "success");
                //Here the response should be send
            } else {
                System.out.println("Neue 'falsche' Nachricht empfangen: " + json.toString());
            }

        } catch (JSONException e) {
            e.printStackTrace();
        }
    }
    

    @Bean
    public WebServerFactoryCustomizer<ConfigurableWebServerFactory> webServerFactoryCustomizer() {
        return factory -> factory.setPort(9090); // Hier wird der Port auf 9090 gesetzt
    }
}

还有我的制作人:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MessagingRabbitmqApplication {

    static final String topicExchangeName = "spring-boot-exchange";

    static final String queueName = "spring-boot4";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        container.setAutoStartup(false); // Set autoStartup to false
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
    }

}

感谢您提前提供的帮助 保罗

java spring-boot rabbitmq producer-consumer
1个回答
-1
投票

所以下面的方法不起作用

@SpringBootApplication
@EnableRabbit
public class SpringConsumer {

    private static final String QUEUE_NAME = "spring-boot4";
    private static final String OTHER_QUEUE_NAME = "other-queue";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumer.class, args);
    }

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        try {
            JSONObject json = new JSONObject(message);
            if ((boolean) json.get("cool")) {
                System.out.println("Neue 'wahre' Nachricht empfangen: " + json.toString());
                // Beispiel: Sende eine neue Nachricht an eine andere Queue
                JSONObject responseJson = new JSONObject();
                responseJson.put("response", "success");
                rabbitTemplate.convertAndSend(OTHER_QUEUE_NAME, responseJson.toString());
            } else {
                System.out.println("Neue 'falsche' Nachricht empfangen: " + json.toString());
            }
        } catch (JSONException e) {
            e.printStackTrace();
        }
    }

    @Bean
    public WebServerFactoryCustomizer<ConfigurableWebServerFactory> webServerFactoryCustomizer() {
        return factory -> factory.setPort(9090);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

它说:

应用程序无法启动


描述:

应用程序上下文中的一些bean的依赖关系形成一个循环:

tomcatServletWebServerFactory 在类路径资源中定义 [org/springframework/boot/autoconfigure/web/servlet/ServletWebServerFactoryConfiguration$EmbeddedTomcat.class] ┌──────┐ | springConsumer(字段私有 org.springframework.amqp.rabbit.core.RabbitTemplate SpringConsumerResponder_akaDirk.SpringConsumer.rabbitTemplate) └──────┘

行动:

不鼓励依赖循环引用,并且默认情况下禁止它们。更新您的应用程序以消除 Bean 之间的依赖循环。作为最后的手段,可以通过将 spring.main.allow-circular-references 设置为 true 来自动打破循环。

© www.soinside.com 2019 - 2024. All rights reserved.