如何在Spring Integration中自定义MqttSubscription?

问题描述 投票:0回答:1

我在 Spring Integration 中使用

org.eclipse.paho.mqttv5.client
并尝试在 mqtt 中设置
no local
选项,如下所示:

    @Bean
    public MessageProducer inbound(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
        Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
                clientManager,
                "test"
        );
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        adapter.connectComplete(true);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

但是

Mqttv5PahoMessageDrivenChannelAdapter
没有办法设置
MqttSubscription
(有mqtt的no-local的配置)

Mqttv5PahoMessageDrivenChannelAdapter
类中,它有一个方法
subscribe

    private void subscribe() {
        var clientManager = getClientManager();
        if (clientManager != null && this.mqttClient == null) {
            this.mqttClient = clientManager.getClient();
        }

        String[] topics = getTopic();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        this.topicLock.lock();
        try {
            if (topics.length == 0) {
                return;
            }

            int[] requestedQos = getQos();
            MqttSubscription[] subscriptions = IntStream.range(0, topics.length)
                    .mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i]))
                    .toArray(MqttSubscription[]::new);
            IMqttMessageListener listener = this::messageArrived;
            IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
                    .mapToObj(t -> listener)
                    .toArray(IMqttMessageListener[]::new);
            this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
                    .waitForCompletion(getCompletionTimeout());
            String message = "Connected and subscribed to " + Arrays.toString(topics);
            logger.debug(message);
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
            }
        }
        catch (MqttException ex) {
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
            }
            logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
        }
        finally {
            this.topicLock.unlock();
        }
    }

但它仅使用参数

MqttSubscription
topic
创建
qos
MqttSubscription[] subscriptions = IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new);

java spring-integration mqtt paho
1个回答
0
投票

这是我们在引入 MQTT v5 支持时错过的东西。

看起来我们必须引入类似基于

MqttSubscription
的构造函数之类的东西,作为普通
topic
及其
qos
的替代选项。这样您就可以对每个订阅进行细粒度配置。

请提出 GH 问题,我们将在下一个 Spring Integration 版本中解决该问题。

作为解决方法,我只能建议直接使用 Paho API。自定义

MessageProducerSupport
impl 可用于将其与项目中集成流程的其余部分连接起来。

© www.soinside.com 2019 - 2024. All rights reserved.