我有问题,我不知道如何动态设置主机并在其他主机上执行RPC操作
这里是情况
我在不同的服务器和网络(即192.168.1.0/24、192.168.2.0/24)上运行了多个RabbitMQ。
该行为是我有一个IP地址列表,我将使用该IP地址执行RPC。因此,对于ip地址列表中的每个条目,我想执行convertSendAndReceive
并处理回复等。
尝试过文档中的一些代码,但是即使无效地址(没有运行有效RabbitMQ的地址,或者网络上不存在事件的地址,例如1.1.1.1)似乎也无法正常工作RabbitMQ(例如在192.168.1.1上运行)
注:我可以在正确的地址上成功执行RPC调用,但是,我也可以在无效的地址上成功执行RPC调用,这并不是我想做的
任何人对此有任何想法吗?
这是我的出处
TaskSchedulerConfiguration.java
@Configuration
@EnableScheduling
public class TaskSchedulerConfiguration {
@Autowired
private IpAddressRepo ipAddressRepo;
@Autowired
private RemoteProcedureService remote;
@Scheduled(fixedDelayString = "5000", initialDelay = 2000)
public void scheduledTask() {
ipAddressRepo.findAll().stream()
.forEach(ipaddress -> {
boolean status = false;
try {
remote.setIpAddress(ipaddress);
remote.doSomeRPC();
} catch (Exception e) {
logger.debug("Unable to Connect to licenser server: {}", license.getIpaddress());
logger.debug(e.getMessage(), e);
}
});
}
}
RemoteProcedureService.java
@Service
public class RemoteProcedureService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange exchange;
public boolean doSomeRPC() throws JsonProcessingException {
//I passed this.factory.getHost() so that i will know if only the valid ip address will be received by the other side
//at this point, other side receives invalid ipaddress which supposedly will not be receive by the oher side
boolean response = (Boolean) template.convertSendAndReceive(exchange.getName(), "rpc", this.factory.getHost());
return response;
}
public void setIpAddress(String host) {
factory.setHost(host);
factory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
factory.setPort(prop.getRabbitMQPort());
factory.setUsername(prop.getRabbitMQUsername());
factory.setPassword(prop.getRabbitMQPassword());
template.setConnectionFactory(factory);
}
}
AmqpConfiguration.java
@Configuration
public class AmqpConfiguration {
public static final String topicExchangeName = "testExchange";
public static final String queueName = "rpc";
@Autowired
private LicenseVisualizationProperties prop;
//Commented this out since this will only be assigne once
//i need to achieve to set it dynamically in order to send to different hosts
//so put it in RemoteProcedureService.java, but it never worked
// @Bean
// public ConnectionFactory connectionFactory() {
// CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// connectionFactory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
// connectionFactory.setPort(prop.getRabbitMQPort());
// connectionFactory.setUsername(prop.getRabbitMQUsername());
// connectionFactory.setPassword(prop.getRabbitMQPassword());
// return connectionFactory;
// }
@Bean
public DirectExchange exhange() {
return new DirectExchange(topicExchangeName);
}
}
UPDATE 1
[似乎在循环期间,当在CachingConnectionFactory
后续ip寻址循环中设置有效ip时,无论有效还是无效,都会被CachingConnectionFactory
中设置的第一个有效ip接收
UPDATE 2
我发现,一旦可以建立成功的连接,就不会创建新的连接。您如何强制RabbitTemplate
建立新连接?
但是,请回答您的问题:
呼叫resetConnection()
关闭连接。