我正在使用rabbitmq和springBoot。我有一个消费者当前正在侦听一个队列,并且应该根据收到的消息向另一个队列发送一条新消息。这就是我的问题。我希望你可以帮助我。我没有通过 Rest 发送消息,而是直接使用 ConvertAndSend 方法发送消息。消息的接收工作正常。
消费者:
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.server.ConfigurableWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableRabbit
public class SpringConsumer {
private static final String QUEUE_NAME = "spring-boot4";
private static final String OTHER_QUEUE_NAME = "resonse_queue";
public static void main(String[] args) {
SpringApplication.run(SpringConsumer.class, args);
}
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) {
try {
JSONObject json = new JSONObject(message);
if ((boolean) json.get("cool")) {
System.out.println("Neue 'wahre' Nachricht empfangen: " + json.toString());
JSONObject responseJson = new JSONObject();
responseJson.put("response", "success");
//Here the response should be send
} else {
System.out.println("Neue 'falsche' Nachricht empfangen: " + json.toString());
}
} catch (JSONException e) {
e.printStackTrace();
}
}
@Bean
public WebServerFactoryCustomizer<ConfigurableWebServerFactory> webServerFactoryCustomizer() {
return factory -> factory.setPort(9090); // Hier wird der Port auf 9090 gesetzt
}
}
还有我的制作人:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class MessagingRabbitmqApplication {
static final String topicExchangeName = "spring-boot-exchange";
static final String queueName = "spring-boot4";
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setAutoStartup(false); // Set autoStartup to false
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
}
}
感谢您提前提供的帮助 保罗
所以下面的方法不起作用
@SpringBootApplication
@EnableRabbit
public class SpringConsumer {
private static final String QUEUE_NAME = "spring-boot4";
private static final String OTHER_QUEUE_NAME = "other-queue";
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(SpringConsumer.class, args);
}
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) {
try {
JSONObject json = new JSONObject(message);
if ((boolean) json.get("cool")) {
System.out.println("Neue 'wahre' Nachricht empfangen: " + json.toString());
// Beispiel: Sende eine neue Nachricht an eine andere Queue
JSONObject responseJson = new JSONObject();
responseJson.put("response", "success");
rabbitTemplate.convertAndSend(OTHER_QUEUE_NAME, responseJson.toString());
} else {
System.out.println("Neue 'falsche' Nachricht empfangen: " + json.toString());
}
} catch (JSONException e) {
e.printStackTrace();
}
}
@Bean
public WebServerFactoryCustomizer<ConfigurableWebServerFactory> webServerFactoryCustomizer() {
return factory -> factory.setPort(9090);
}
@Bean
public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
它说:
应用程序无法启动
描述:
应用程序上下文中的一些bean的依赖关系形成一个循环:
tomcatServletWebServerFactory 在类路径资源中定义 [org/springframework/boot/autoconfigure/web/servlet/ServletWebServerFactoryConfiguration$EmbeddedTomcat.class] ┌──────┐ | springConsumer(字段私有 org.springframework.amqp.rabbit.core.RabbitTemplate SpringConsumerResponder_akaDirk.SpringConsumer.rabbitTemplate) └──────┘
行动:
不鼓励依赖循环引用,并且默认情况下禁止它们。更新您的应用程序以消除 Bean 之间的依赖循环。作为最后的手段,可以通过将 spring.main.allow-circular-references 设置为 true 来自动打破循环。