在Spring Boot上为Spring AMQP和RabbitMQ动态设置主机

问题描述 投票:0回答:1

我有问题,我不知道如何动态设置主机并在其他主机上执行RPC操作

这里是情况

我在不同的服务器和网络(即192.168.1.0/24、192.168.2.0/24)上运行了多个RabbitMQ。

该行为是我有一个IP地址列表,我将使用该IP地址执行RPC。因此,对于ip地址列表中的每个条目,我想执行convertSendAndReceive并处理回复等。

尝试过文档中的一些代码,但是即使无效地址(没有运行有效RabbitMQ的地址,或者网络上不存在事件的地址,例如1.1.1.1)似乎也无法正常工作RabbitMQ(例如在192.168.1.1上运行)

注:我可以在正确的地址上成功执行RPC调用,但是,我也可以在无效的地址上成功执行RPC调用,这并不是我想做的

任何人对此有任何想法吗?

这是我的出处

TaskSchedulerConfiguration.java

@Configuration
@EnableScheduling
public class TaskSchedulerConfiguration {
    @Autowired
    private IpAddressRepo ipAddressRepo;

    @Autowired
    private RemoteProcedureService remote;

    @Scheduled(fixedDelayString  = "5000", initialDelay = 2000)
    public void scheduledTask() {
        ipAddressRepo.findAll().stream()
              .forEach(ipaddress -> {
                boolean status = false;
                try {
                    remote.setIpAddress(ipaddress);
                    remote.doSomeRPC();                 
                } catch (Exception e) {
                    logger.debug("Unable to Connect to licenser server: {}", license.getIpaddress());
                    logger.debug(e.getMessage(), e);
                } 
              });

    }

}

RemoteProcedureService.java

@Service
public class RemoteProcedureService {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private DirectExchange exchange;


    public boolean doSomeRPC() throws JsonProcessingException {

        //I passed this.factory.getHost() so that i will know if only the valid ip address will be received by the other side
        //at this point, other side receives invalid ipaddress which supposedly will not be receive by the oher side
        boolean response = (Boolean) template.convertSendAndReceive(exchange.getName(), "rpc", this.factory.getHost());
        return response;
    }

    public void setIpAddress(String host) {
        factory.setHost(host);
        factory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
        factory.setPort(prop.getRabbitMQPort());
        factory.setUsername(prop.getRabbitMQUsername());
        factory.setPassword(prop.getRabbitMQPassword());
        template.setConnectionFactory(factory);

    }

}

AmqpConfiguration.java

@Configuration
public class AmqpConfiguration {
    public static final String topicExchangeName = "testExchange";

    public static final String queueName = "rpc";

    @Autowired
    private LicenseVisualizationProperties prop;

//Commented this out since this will only be assigne once
//i need to achieve to set it dynamically in order to send to different hosts
//so put it in RemoteProcedureService.java, but it never worked
//    @Bean
//    public ConnectionFactory connectionFactory() {
//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//        connectionFactory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
//        connectionFactory.setPort(prop.getRabbitMQPort());
//        connectionFactory.setUsername(prop.getRabbitMQUsername());
//        connectionFactory.setPassword(prop.getRabbitMQPassword());
//        return connectionFactory;
//    }

    @Bean
    public DirectExchange exhange() {
        return new DirectExchange(topicExchangeName);
    }

}

UPDATE 1

[似乎在循环期间,当在CachingConnectionFactory后续ip寻址循环中设置有效ip时,无论有效还是无效,都会被CachingConnectionFactory中设置的第一个有效ip接收

UPDATE 2

我发现,一旦可以建立成功的连接,就不会创建新的连接。您如何强制RabbitTemplate建立新连接?

java spring rabbitmq spring-amqp
1个回答
0
投票
这是一个相当奇怪的用例,表现不佳;您最好拥有一组连接工厂和模板。

但是,请回答您的问题:

呼叫resetConnection()关闭连接。

© www.soinside.com 2019 - 2024. All rights reserved.