同一个 Spring Boot 应用程序中的订阅者和生产者

问题描述 投票:0回答:1

我有一个正在运行的 RabbitMQ 服务器和两个 Spring Boot 应用程序。我们称之为

appA
appB

appA
每5m订阅一条消息到
queue1

appB
使用
queue1
并进行一些数据转换,并且应该向
queue2
发送(订阅)多条消息以供其他应用程序使用,例如
appC
appD
使用它。

在这种情况下

appB
同时是消费者/订阅者。

我正在使用 Spring Boot 自动配置(来自 application.properties),目前我正在按预期从

queue1
读取消息,但是当尝试发送消息时,我收到如下错误:

Cannot invoke "org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(String, String, Object)" because "this.rabbitTemplate" is null

是否可以在同一个 Spring Boot 应用程序中拥有消费者和订阅者?

谢谢

spring-boot rabbitmq
1个回答
0
投票

是的,有可能。

看这个代码

  1. 这是兔子配置
package com.submodule.conf;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;

@Configuration
public class RabbitConf {



    public final static String alarmQueue = "ALARM";

    public final static String rmmQueue = "RMM" ;


    public final static String exchange = "RMM-exchange";

    public final static String routingAlarmKey = "alarm_routing_key";

    public final static String routingRmmKey = "rmm_routing_key";

    // spring bean for rabbitmq queue
    @Bean
    public Queue AlarmQueue(){
        return new Queue(alarmQueue);
    }

    // spring bean for queue (store json messages)
    @Bean
    public Queue rmmQueue(){
        return new Queue(rmmQueue);
    }

    // spring bean for rabbitmq exchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange(exchange);
    }

    // binding between queue and exchange using routing key
    @Bean
    public Binding binding(){
        return BindingBuilder
                .bind(AlarmQueue())
                .to(exchange())
                .with(routingAlarmKey);
    }

    // binding between json queue and exchange using routing key
    @Bean
    public Binding jsonBinding(){
        return BindingBuilder
                .bind(rmmQueue())
                .to(exchange())
                .with(routingRmmKey);
    }



    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

这是兔子消费者


package com.submodule.rabbit;

import com.google.gson.Gson;
import com.submodule.entity.Threshold;
import com.submodule.service.ThresholdService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.concurrent.CountDownLatch;

@Component
public class Receiver {

    private final ThresholdService thresholdService;
    private final Gson gson;


    private CountDownLatch latch = new CountDownLatch(1);

    public Receiver(ThresholdService thresholdService, Gson gson) {
        this.thresholdService = thresholdService;
        this.gson = gson;

    }


    @Transactional
    @RabbitListener(queues = {"RMM"})
    public void receiveMessage(String threshold) {
        Threshold obj = gson.fromJson(threshold, Threshold.class);
        System.out.println("recieved object is " + obj);
        try {
            thresholdService.save(obj);
            latch.countDown();
        } catch (Exception exception) {
            System.out.println("something bad occurred");
        }

    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

这是一个在兔子队列上发送对象的控制器


package com.submodule.controller;

import com.google.gson.Gson;
import com.submodule.conf.RabbitConf;
import com.submodule.entity.Threshold;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
public class ThresholdController {


    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public ThresholdController( RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }


    @PostMapping("send")
    public void send(@RequestBody Threshold threshold){
        System.out.println("Sending message...");
        Gson gson = new Gson();
        String obj = gson.toJson(threshold);
        rabbitTemplate.convertAndSend(RabbitConf.exchange, RabbitConf.routingRmmKey, obj);
    }
}

这是我转移的对象


import java.util.Date;

public class Threshold {
    
    private String id;

    private Date date;

    private String thresholdUUID;

}

© www.soinside.com 2019 - 2024. All rights reserved.