我在 Spring Integration 中使用
org.eclipse.paho.mqttv5.client
并尝试在 mqtt 中设置 no local
选项,如下所示:
@Bean
public MessageProducer inbound(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
clientManager,
"test"
);
adapter.setCompletionTimeout(5000);
adapter.setQos(2);
adapter.connectComplete(true);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
但是
Mqttv5PahoMessageDrivenChannelAdapter
没有办法设置MqttSubscription
(有mqtt的no-local的配置)
在
Mqttv5PahoMessageDrivenChannelAdapter
类中,它有一个方法subscribe
:
private void subscribe() {
var clientManager = getClientManager();
if (clientManager != null && this.mqttClient == null) {
this.mqttClient = clientManager.getClient();
}
String[] topics = getTopic();
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
this.topicLock.lock();
try {
if (topics.length == 0) {
return;
}
int[] requestedQos = getQos();
MqttSubscription[] subscriptions = IntStream.range(0, topics.length)
.mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i]))
.toArray(MqttSubscription[]::new);
IMqttMessageListener listener = this::messageArrived;
IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
.mapToObj(t -> listener)
.toArray(IMqttMessageListener[]::new);
this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
.waitForCompletion(getCompletionTimeout());
String message = "Connected and subscribed to " + Arrays.toString(topics);
logger.debug(message);
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
}
}
catch (MqttException ex) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
}
logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
}
finally {
this.topicLock.unlock();
}
}
但它仅使用参数
MqttSubscription
和 topic
创建 qos
:
MqttSubscription[] subscriptions = IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new);
这是我们在引入 MQTT v5 支持时错过的东西。
看起来我们必须引入类似基于
MqttSubscription
的构造函数之类的东西,作为普通 topic
及其 qos
的替代选项。这样您就可以对每个订阅进行细粒度配置。
请提出 GH 问题,我们将在下一个 Spring Integration 版本中解决该问题。
作为解决方法,我只能建议直接使用 Paho API。自定义
MessageProducerSupport
impl 可用于将其与项目中集成流程的其余部分连接起来。