Spring集成MQTT,发送响应到网关

问题描述 投票:0回答:1

服务:

Message<MQTTResponse> response = mqttGateway.sendToMqtt("topic", jsonData);
    log.info("Receive reply{}",String.valueOf(response.getPayload()));

我有这个

MessagingGateway

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel", defaultReplyChannel = "mqttResponseChannel")
public interface MqttGateway {
    Message<MQTTResponse> sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}

约束:

sendToMqtt
将向一个主题发送请求,该主题的响应将来自另一个主题。我将用
mqttInputChannel

捕捉它
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(Message<String> message) throws JsonProcessingException {
    if (Objects.equals(message.getHeaders().get("mqtt_receivedTopic"), topicAck)) {
        ObjectMapper objectMapper = new ObjectMapper();
        MQTTResponse payloadMap = objectMapper.readValue(message.getPayload(), MQTTResponse.class);
        log.info("Payload: {}", payloadMap);

        MessageChannel mqttResponseChannel = mqttResponseChannel();
        Message<MQTTResponse> responseMessage = MessageBuilder.
           withPayload(payloadMap)
           .setReplyChannel(mqttResponseChannel)
           .build();
        log.info("Response Message: {}", responseMessage);
        mqttResponseChannel.send(responseMessage);
    }
}

线路

log.info("Response Message: {}", responseMessage);
之后,立即失去连接:
Lost connection: MqttException

过了一会儿:

java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getPayload()" because "response" is null 

我无法通过回复通道将从

handleMessage
函数收到的响应发送回服务
mqttResponseChannel

spring-boot spring-integration mqtt spring-integration-http
1个回答
0
投票

事情不会那么简单。

@MessagingGateway
是一种请求-回复模式,通过
replyChannel
标头与
TemporaryReplyChannel
值在请求和回复之间建立关联。

当您从另一个进程接收消息时,在您的情况下是 MQTT 入站通道适配器,标头中肯定没有该信息。并且您不能只是将消息发送到

mqttResponseChannel
,期望它与请求相关联。只是因为这条消息没有那个
replyChannel
标头。

通常,对于这种异步关联,当请求-答复仍然阻塞时,我建议使用聚合器解决方案。这真的不那么简单:

您将请求发送到 MQTT 并发送到带有一些唯一相关键的聚合器,该关联键将以某种方式出现在发送给 MQTT 的请求中。

当您收到来自回复主题的消息时,您也会发送到该聚合器。该回复消息必须具有与请求相同的相关密钥信息。然后聚合器执行请求和回复之间的关联。在发布函数中,您从请求消息中提取标头(通常是组中的第一个)。只需返回一条添加了这些标头的回复消息即可。这将包括提到的与发起者网关相关的

replyChannel
。您的聚合器不得有
outputChannel
,框架将选择
replyChannel
标头来生成消息。

此模式的关键部分是保留 MQTT 请求和回复中的相关密钥。

在文档中查看有关聚合器的更多信息:https://docs.spring.io/spring-integration/reference/aggregator.html

© www.soinside.com 2019 - 2024. All rights reserved.