我有一个正在运行的 RabbitMQ 服务器和两个 Spring Boot 应用程序。我们称之为
appA
和 appB
appA
每5m订阅一条消息到queue1
。
appB
使用 queue1
并进行一些数据转换,并且应该向 queue2
发送(订阅)多条消息以供其他应用程序使用,例如 appC
、appD
使用它。
在这种情况下
appB
同时是消费者/订阅者。
我正在使用 Spring Boot 自动配置(来自 application.properties),目前我正在按预期从
queue1
读取消息,但是当尝试发送消息时,我收到如下错误:
Cannot invoke "org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(String, String, Object)" because "this.rabbitTemplate" is null
是否可以在同一个 Spring Boot 应用程序中拥有消费者和订阅者?
谢谢
是的,有可能。
看这个代码
package com.submodule.conf;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitConf {
public final static String alarmQueue = "ALARM";
public final static String rmmQueue = "RMM" ;
public final static String exchange = "RMM-exchange";
public final static String routingAlarmKey = "alarm_routing_key";
public final static String routingRmmKey = "rmm_routing_key";
// spring bean for rabbitmq queue
@Bean
public Queue AlarmQueue(){
return new Queue(alarmQueue);
}
// spring bean for queue (store json messages)
@Bean
public Queue rmmQueue(){
return new Queue(rmmQueue);
}
// spring bean for rabbitmq exchange
@Bean
public TopicExchange exchange(){
return new TopicExchange(exchange);
}
// binding between queue and exchange using routing key
@Bean
public Binding binding(){
return BindingBuilder
.bind(AlarmQueue())
.to(exchange())
.with(routingAlarmKey);
}
// binding between json queue and exchange using routing key
@Bean
public Binding jsonBinding(){
return BindingBuilder
.bind(rmmQueue())
.to(exchange())
.with(routingRmmKey);
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
这是兔子消费者
package com.submodule.rabbit;
import com.google.gson.Gson;
import com.submodule.entity.Threshold;
import com.submodule.service.ThresholdService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.CountDownLatch;
@Component
public class Receiver {
private final ThresholdService thresholdService;
private final Gson gson;
private CountDownLatch latch = new CountDownLatch(1);
public Receiver(ThresholdService thresholdService, Gson gson) {
this.thresholdService = thresholdService;
this.gson = gson;
}
@Transactional
@RabbitListener(queues = {"RMM"})
public void receiveMessage(String threshold) {
Threshold obj = gson.fromJson(threshold, Threshold.class);
System.out.println("recieved object is " + obj);
try {
thresholdService.save(obj);
latch.countDown();
} catch (Exception exception) {
System.out.println("something bad occurred");
}
}
public CountDownLatch getLatch() {
return latch;
}
}
这是一个在兔子队列上发送对象的控制器
package com.submodule.controller;
import com.google.gson.Gson;
import com.submodule.conf.RabbitConf;
import com.submodule.entity.Threshold;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
public class ThresholdController {
private final RabbitTemplate rabbitTemplate;
@Autowired
public ThresholdController( RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@PostMapping("send")
public void send(@RequestBody Threshold threshold){
System.out.println("Sending message...");
Gson gson = new Gson();
String obj = gson.toJson(threshold);
rabbitTemplate.convertAndSend(RabbitConf.exchange, RabbitConf.routingRmmKey, obj);
}
}
这是我转移的对象
import java.util.Date;
public class Threshold {
private String id;
private Date date;
private String thresholdUUID;
}