Spring Integration MQTT 共享订阅

问题描述 投票:0回答:1

我的项目需要支持mqtt。我使用 RabbitMQ 作为代理。我开发了 Spring Boot 应用程序,并使用 Spring Integration MQTT。

@Configuration
public class MqttConfig {
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{ "tcp://localhost:1883" });
        options.setUserName("user_admin");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer", clientFactory(), "example_topic");

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("MESSAGE: " + message.getPayload());
            }

        };
    }
}

pom.xml

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.2.1</version>
</dependency>

我在不同的端口 8081 和 8082 运行同一应用程序的两个实例

  java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8081
  java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8082

我使用MQTTX Client Toolbox进行测试。 当我发送几条关于该主题的消息时

example_topic
它总是到达同一个端口 8082

MESSAGE: Hello World // port 8082

MESSAGE: Hello World // port 8082

MESSAGE: Hello World // port 8082

我如何实现 MQTT 共享订阅以允许客户端负载平衡,这意味着代理在特定主题的订阅客户端之间平均分配消息负载?

MESSAGE: Hello World // port 8081

MESSAGE: Hello World // port 8082

MESSAGE: Hello World // port 8081

MESSAGE: Hello World // port 8082
spring spring-boot rabbitmq mqtt spring-integration-mqtt
1个回答
0
投票

如果代理支持共享订阅,那么使用它们只需向客户端订阅的主题添加正确的前缀即可。

例如

$shared/group-identifier/example_topic

其中

group-identifier
是客户端集合的唯一ID

© www.soinside.com 2019 - 2024. All rights reserved.