我正在从事Java和RabbitMQ学习的家庭作业。我对Spring和RabbitMQ并不熟悉,但我无法解决这个问题。
我有2个单一的申请。
第一个,它产生消息(bolid应用程序)我创建了一个消息生成器(bolid),它每10秒向一个监听器发送一条消息
@SpringBootApplication
public class BolidApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(BolidApplication.class, args);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(String... args) throws Exception {
Bolid bolid = new Bolid();
int i = 10;
while (true) {
bolid.setData(new Date());
rabbitTemplate.setReplyAddress("bolidReply");
rabbitTemplate.convertAndSend("RaceExchange", "raceRouting", bolid.toString());
rabbitTemplate.convertAndSend("MonitorExchange", "raceRouting", bolid.toString());
Thread.sleep(15000);
i += 10;
}
}
}
所以,我创建了2个队列(RaceQueue和MonitorQueue),定义了交换并绑定它们。
我有2个监听器:RaceListener和MonitorListener。
有我的听众的代码:
第二个应用程序,即听众。
public class RabbitConfig {
private static final String RACE_QUEUE = "RaceQueue";
private static final String MONITOR_QUEUE = "MonitorQueue";
@Bean
Queue myQueue() {
return new Queue(RACE_QUEUE, true);
}
@Bean
Queue monitorQueue() {
return new Queue(MONITOR_QUEUE, true);
}
@Bean
Exchange myExchange() {
return ExchangeBuilder.topicExchange("RaceExchange")
.durable(true)
.build();
}
@Bean
Exchange monitorExchange() {
return ExchangeBuilder.topicExchange("MonitorExchange")
.durable(true)
.build();
}
@Bean
Binding binding() {
// return new Binding(MY_QUEUE, Binding.DestinationType.QUEUE, "MyTopicExchange", "topic", null)
return BindingBuilder
.bind(myQueue())
.to(myExchange())
.with("raceRouting")
.noargs();
}
@Bean
Binding monitorBinding() {
return BindingBuilder
.bind(monitorQueue())
.to(monitorExchange())
.with("raceRouting")
.noargs();
}
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
return cachingConnectionFactory;
}
@Bean
MessageListenerContainer rabbitRaceListener() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
simpleMessageListenerContainer.setQueues(myQueue());
simpleMessageListenerContainer.setupMessageListener(new RabbitRaceListener());
return simpleMessageListenerContainer;
}
@Bean
MessageListenerContainer rabbitMonitorListener() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
simpleMessageListenerContainer.setQueues(monitorQueue());
simpleMessageListenerContainer.setupMessageListener(new RabbitMonitorListener());
return simpleMessageListenerContainer;
}
}
从MonitorListener我想使用回复模式将消息回复到我的第一个应用程序(bolid应用程序)。所以Bolid应用程序可以收到我的消息。
我的Monitor监听器代码:
public class RabbitMonitorListener implements MessageListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message) {
String[] splitted = new String(message.getBody()).split("\\|");
int oilTemperature = Integer.parseInt(splitted[1].split(" ")[2]);
int engineTemperature = Integer.parseInt(splitted[2].split(" ")[2]);
int tirePressure = Integer.parseInt(splitted[3].split(" ")[2]);
System.out.println("message2 = [" + new String(message.getBody()) + "]");
if (oilTemperature > 120 || engineTemperature > 120 || tirePressure > 12) {
System.out.println("SEND REPLY TO BOLID!");
}
if (oilTemperature > 150 || engineTemperature > 150 || tirePressure > 17) {
System.out.println("SEND RELY TO BOLID!");
}
}
}
我怎样才能做到这一点?所以在这里我可以发送消息回到bolid并在bolid应用程序上我可以阅读它吗?
编辑:我做了一些研究,我想这样做:
public class RabbitMonitorListener implements MessageListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message) {
String[] splitted = new String(message.getBody()).split("\\|");
int oilTemperature = Integer.parseInt(splitted[1].split(" ")[2]);
int engineTemperature = Integer.parseInt(splitted[2].split(" ")[2]);
int tirePressure = Integer.parseInt(splitted[3].split(" ")[2]);
String response = "Hello";
MessageProperties properties = new MessageProperties();
Message responseMessage = new Message(response.getBytes(), properties);
rabbitTemplate.send(message.getMessageProperties().getReplyTo(), responseMessage);
System.out.println("message2 = [" + new String(message.getBody()) + "]");
if (oilTemperature > 120 || engineTemperature > 120 || tirePressure > 12) {
System.out.println("WARN MECHANICS");
}
if (oilTemperature > 150 || engineTemperature > 150 || tirePressure > 17) {
System.out.println("WARN MECHANICS");
}
}
}
但是RabbitTemplate在这里是空的,所以我不能在这里@Autowired它。如何在MessageListener中访问rabbitTemplate和方法发送?
new RabbitRaceListener()
- 也必须是@Bean
,以获得自动接线。
但是,你过于复杂了;框架可以为您处理所有这些。
请参阅Request/Reply Messaging作为客户端 - 并使用convertSendAndReceive()
或convertSendAndReceiveAsType()
。
在服务器端,请参阅Annotation-driven Listener Endpoints。
@RabbitListener(queues = "request")
public String handle(String in) {
return in.toUpperCase();
}