@EnableIntegration
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@IntegrationComponentScan(basePackages = "org.sample.mqtt")
public class MqttConfig {
private String[] serverUris;
private String username;
private char[] password;
private int keepAliveInterval;
private String[] subTopics;
private Class<? extends BytesMessageMapper> messageMapper;
private String clientIdPrefix;
private String modelPackages;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(serverUris);
options.setUserName(username);
options.setPassword(password);
options.setKeepAliveInterval(keepAliveInterval);
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MqttMessageConverter bytesMessageConverter() throws NoSuchMethodException {
BytesMessageMapper bytesMessageMapper = BeanUtils.instantiateClass(messageMapper.getConstructor(String.class), modelPackages);
return new BytesMessageConverter(bytesMessageMapper);
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttMessageConverter mqttMessageConverter) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdPrefix + "_outbound", mqttClientFactory());
messageHandler.setConverter(mqttMessageConverter);
messageHandler.setCompletionTimeout(5000);
messageHandler.setAsync(true);
return messageHandler;
}
@Bean
public MessageProducer mqttInbound(MqttMessageConverter mqttMessageConverter) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientIdPrefix + "_inbound", mqttClientFactory(), subTopics);
adapter.setConverter(mqttMessageConverter);
adapter.setOutputChannel(mqttInboundChannel());
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
return adapter;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
MqttClient client = new MqttClient(serverUrl, clientId, new MemoryPersistence());
根据clientId,可以在HashMap中创建并维护多个MQTTClient。如果使用Spring-integration-MQTT如何才能达到同样的效果
如何通过 HTTP 请求创建多个 MQTT 客户端并在 HashMap 中维护它们
IntegrationFlowContext 动态注册 FlowContext.Registration(flow).register();
但还是不知道如何处理和使用。如何维护和管理每个MQTT客户端,以便可以根据hashMap的key来选择它来生产数据
我也遇到同样的问题,请问你有什么办法解决吗