如何使用 Spring-Integration-MQTT 创建多个 ClientId 并生成消息?

问题描述 投票:0回答:1
@EnableIntegration
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@IntegrationComponentScan(basePackages = "org.sample.mqtt")
public class MqttConfig {

    private String[] serverUris;
    private String username;
    private char[] password;
    private int keepAliveInterval;
    private String[] subTopics;
    private Class<? extends BytesMessageMapper> messageMapper;
    private String clientIdPrefix;
    private String modelPackages;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(serverUris);
        options.setUserName(username);
        options.setPassword(password);
        options.setKeepAliveInterval(keepAliveInterval);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MqttMessageConverter bytesMessageConverter() throws NoSuchMethodException {
        BytesMessageMapper bytesMessageMapper = BeanUtils.instantiateClass(messageMapper.getConstructor(String.class), modelPackages);
        return new BytesMessageConverter(bytesMessageMapper);
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttMessageConverter mqttMessageConverter) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdPrefix + "_outbound", mqttClientFactory());
        messageHandler.setConverter(mqttMessageConverter);
        messageHandler.setCompletionTimeout(5000);
        messageHandler.setAsync(true);
        return messageHandler;
    }

    @Bean
    public MessageProducer mqttInbound(MqttMessageConverter mqttMessageConverter) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientIdPrefix + "_inbound", mqttClientFactory(), subTopics);
        adapter.setConverter(mqttMessageConverter);
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return adapter;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }


MqttClient client = new MqttClient(serverUrl, clientId, new MemoryPersistence());

根据clientId,可以在HashMap中创建并维护多个MQTTClient。如果使用Spring-integration-MQTT如何才能达到同样的效果

如何通过 HTTP 请求创建多个 MQTT 客户端并在 HashMap 中维护它们

IntegrationFlowContext 动态注册 FlowContext.Registration(flow).register();

但还是不知道如何处理和使用。如何维护和管理每个MQTT客户端,以便可以根据hashMap的key来选择它来生产数据

java spring-boot mqtt spring-integration-mqtt
1个回答
0
投票

我也遇到同样的问题,请问你有什么办法解决吗

© www.soinside.com 2019 - 2024. All rights reserved.