模块之间的 Azure IoT Edge 转发消息不包含属性(目标 Node-RED)(已解决)

问题描述 投票:0回答:1

我部署了一个带有 2 个模块的 IoT Edge 设备,一个是模拟温度传感器实例,另一个是Node-RED实例(均来自 Marketplace)。我可以将从第一个消息生成的消息转发到 Node-RED 的一个输入,但收到的消息不包含任何属性,系统也不是用户定义的。

为此 IoT Edge 设备配置的路由列表包括 2:

  1. NoderedToIotHub = FROM /messages/modules/nodered/outputs/* INTO $upstream
  2. SimulatedTemperatureSensorToNodered = FROM /messages/modules/SimulatedTemperatureSensor/* INTO BrokeredEndpoint(“/modules/nodered/inputs/input1”)

虽然 Node-RED 收到了该消息,但我无权访问属性(iothub-connection-device-id 等)。这是收到的消息:

{
    "payload": {
        "machine": {
            "temperature": 78.45809454307097,
            "pressure": 7.545858871995427
        },
        "ambient": {
            "temperature": 21.225943549765738,
            "humidity": 26
        },
        "timeCreated": "2024-05-10T12:40:41.4387644Z"
    },
    "topic": "input",
    "input": "input1",
    "_msgid": "ac58dc85f99232e7"
}

按照我在本网站的另一个线程中找到的建议,我在第二条路线的末尾添加了“WITH $includeProperties”,但仍然相同。我还尝试添加“WITH $systemProperties”来至少查看系统的,只是为了测试,但什么也没有。

有什么办法转发吗?在某些情况下我可能需要它们。

为了以防万一,市场中提供的 Node-RED 版本使用 node-red-contrib-azure-iot-edge-module (1.0.4)。我知道有一个替代且更新稍多的版本(node-red-contrib-azure-iot-edge-kpm),但看起来将来不会维护。

所以我进入查看这个模块的源代码,主文件azureiotedge.js看起来像这样:

module.exports = function (RED) {
    'use strict'

    var Transport = require('azure-iot-device-mqtt').Mqtt;
    var Client = require('azure-iot-device').ModuleClient;
    var Message = require('azure-iot-device').Message;

    var statusEnum = {
        disconnected: { color: "red", text: "Disconnected" },
        connected: { color: "green", text: "Connected" },
        sent: { color: "blue", text: "Sending message" },
        received: { color: "yellow", text: "Receiving message" },
        reported: { color: "blue", text: "Sending reported properties" },
        desired: { color: "yellow", text: "Receiving desired properties" },
        method: { color: "yellow", text: "Receiving direct method" },
        response: { color: "blue", text: "Sending method response" },
        error: { color: "grey", text: "Error" }
    };

    var edgeClient;
    var moduleTwin;
    var methodResponses = [];

    // Function to create the IoT Edge Client 
    function IoTEdgeClient(config) {
        // Store node for further use
        var node = this;
        node.connected = false;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);

        // Create the IoT Edge client
        Client.fromEnvironment(Transport, function (err, client) {
            if (err) {
                node.log('Module Client creation error:' + err);
            }
            else {
                client.on('error', function (err) {
                    node.log('Module Client error:' + err);
                });
                node.log('Module Client created.');
                // connect to the Edge instance
                client.open(function (err) {
                    if (err) {
                        node.log('Module Client open error:' + err);
                        throw err;
                    } else {
                        node.log('Module Client connected.');
                        edgeClient = client;
                        client.getTwin(function(err, twin) {
                            if (err) {
                                node.error('Could not get the module twin: ' + err);
                                throw err;
                            } else {
                                node.log('Module twin created.');
                                node.log('Twin contents:');
                                node.log(JSON.stringify(twin.properties));

                                node.on('close', function() {
                                    node.log('Azure IoT Edge Module Client closed.');
                                    edgeClient = null;
                                    moduleTwin = null;
                                    twin.removeAllListeners();
                                    client.removeAllListeners();
                                    client.close();
                                });
                                moduleTwin = twin;
                            }
                        });
                    }
                });
            }
        });
    }

    // Function to create the Module Twin 
    function ModuleTwin(config) {
        // Store node for further use
        var node = this;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);  

        // Get the twin
        getTwin().then(function(twin){
            setStatus(node, statusEnum.connected);
            // Register for changes
            twin.on('properties.desired', function(delta) {
                setStatus(node, statusEnum.desired);
                node.log('New desired properties received:');
                node.log(JSON.stringify(delta));
                node.send({payload: delta, topic: "desired"})
                setStatus(node, statusEnum.connected);
            });

            node.on('input', function (msg) {
                setStatus(node, statusEnum.reported);
                var messageJSON = null;
    
                if (typeof (msg.payload) != "string") {
                    messageJSON = msg.payload;
                } else {
                    //Converting string to JSON Object
                    messageJSON = JSON.parse(msg.payload);
                }

                twin.properties.reported.update(messageJSON, function(err) {
                    if (err) throw err;
                    node.log('Twin state reported.');
                    setStatus(node, statusEnum.connected);
                });
            });
        })
        .catch(function(err){
            node.log('Module Twin error:' + err);
        });
                        
        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Module input to receive input from edgeHub
    function ModuleInput(config) {
        // Store node for further use
        var node = this;
        node.input = config.input;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            // Act on module input messages
            node.log("Module Input created: " + node.input);
            client.on('inputMessage', function (inputName, msg) {
                outputMessage(client, node, inputName, msg);
            });
        })
        .catch(function(err){
            node.log("Module Input can't be loaded: " + err);
        });
       
        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Module output to send output to edgeHub 
    function ModuleOutput(config) {
        // Store node for further use
        var node = this;
        node.output = config.output;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            // React on input from node-red
            node.log("Module Output created: " + node.output);
            node.on('input', function (msg) {
                setStatus(node, statusEnum.sent);
                var messageJSON = null;

                if (typeof (msg.payload) != "string") {
                    messageJSON = msg.payload;
                } else {
                    //Converting string to JSON Object
                    messageJSON = JSON.parse(msg.payload);
                }

                var messageOutput = node.output;
                sendMessageToEdgeHub(client, node, messageJSON, messageOutput);
            });
        })
        .catch(function(err){
            node.error("Module Output can't be loaded: " + err);
        });

        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Module method to receive methods from IoT Hub 
    function ModuleMethod(config) {
        // Store node for further use
        var node = this;
        node.method = config.method;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            var mthd = node.method;
            node.log('Direct Method created: ' + mthd);
            client.onMethod(mthd, function(request, response) {
                // Set status
                setStatus(node, statusEnum.method);
                node.log('Direct Method called: ' + request.methodName);

                if(request.payload) {
                    node.log('Method Payload:' + JSON.stringify(request.payload));
                    node.send({payload: request.payload,topic: "method", method: request.methodName});
                }
                else {
                    node.send({payload: null,topic: "method", method: request.methodName});
                }

                getResponse(node).then(function(rspns){
                    var responseBody;
                    if (typeof (rspns.response) != "string") {
                        // Turn message object into string 
                        responseBody = JSON.stringify(rspns.response);
                    } else {
                        responseBody = rspns.response;
                    }
                    response.send(rspns.status, responseBody, function(err) {
                        if (err) {
                        node.log('Failed sending method response: ' + err);
                        } else {
                        node.log('Successfully sent method response.');
                        }
                    });
                })
                .catch(function(err){
                    node.error("Failed sending method response: response not received.");
                });
                // reset response
                node.response = null;

                setStatus(node, statusEnum.connected);
            }); 
            
            // Set method response on input
            node.on('input', function (msg) {
                var method = node.method;
                methodResponses.push(
                    {method: method, response: msg.payload, status: msg.status}
                );
                node.log("Module Method response set through node input: " + JSON.stringify(methodResponses.find(function(m){return m.method === method}))); 
            });
        })
        .catch(function(err){
            node.error("Module Method can't be loaded: " + err);
        });

        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Get module client using promise, and retry, and slow backoff
    function getClient(){
        var retries = 20;
        var timeOut = 1000;
        // Retrieve client using progressive promise to wait for module client to be opened
        var promise = Promise.reject();
        for(var i=1; i <= retries; i++) {
            promise = promise.catch( function(){
                    if (edgeClient){
                        return edgeClient;
                    }
                    else {
                        throw new Error("Module Client not initiated..");
                    }
                })
                .catch(function rejectDelay(reason) {
                    retries++;
                    return new Promise(function(resolve, reject) {
                        setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
                    });
                });
        }
        return promise;
    }

    // Get module twin using promise, and retry, and slow backoff
    function getTwin(){
        var retries = 10;
        var timeOut = 1000;
        // Retrieve twin using progressive promise to wait for module twin to be opened
        var promise = Promise.reject();
        for(var i=1; i <= retries; i++) {
            promise = promise.catch( function(){
                    if (moduleTwin){
                        return moduleTwin;
                    }
                    else {
                        throw new Error("Module Client not initiated..");
                    }
                })
                .catch(function rejectDelay(reason) {
                    return new Promise(function(resolve, reject) {
                        setTimeout(reject.bind(null, reason), timeOut * i);
                });
            });
        }
        return promise;
    }

    // Get module method response using promise, and retry, and slow backoff
    function getResponse(node){
        var retries = 20;
        var timeOut = 1000;
        var m = {};
        node.log("Module Method node method: " + node.method);
        // Retrieve client using progressive promise to wait for module client to be opened
        var promise = Promise.reject();
        for(var i=1; i <= retries; i++) {
            promise = promise.catch( function(){
                    var methodResponse = methodResponses.find(function(m){return m.method === node.method});
                    if (methodResponse){
                        // get the response and clean the array
                        var response = methodResponse;
                        node.log("Module Method response object found: " + JSON.stringify(response));
                        methodResponses.splice(methodResponses.findIndex(function(m){return m.method === node.method}),1);
                        return response;
                    }
                    else {
                        throw new Error("Module Method Response not initiated..");
                    }
                })
                .catch(function rejectDelay(reason) {
                    retries++;
                    return new Promise(function(resolve, reject) {
                        setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
                    });
                });
        }
        return promise;
    }

    // This function just sends the incoming message to the node output adding the topic "input" and the input name.
    var outputMessage = function(client, node, inputName, msg) {

        client.complete(msg, function (err) {
            if (err) {
                node.error('Failed sending message to node output:' + err);
                setStatus(node, statusEnum.error);
            }
        });

        if (inputName === node.input){
            setStatus(node, statusEnum.received);
            var message = JSON.parse(msg.getBytes().toString('utf8'));
            if (message) {
                node.log('Processed input message:' + inputName)
                // send to node output
                node.send({payload: message, topic: "input", input: inputName});
            }
            setStatus(node, statusEnum.connected);
        }   
    }

    var setStatus = function (node, status) {
        node.status({ fill: status.color, shape: "dot", text: status.text });
    }

    var sendMessageToEdgeHub = function (client, node, message, output) {

        // Send the message to IoT Edge
        if (!output)
        {
            output = "output";
        }
        node.log('Sending Message to Azure IoT Edge: ' + output + '\n   Payload: ' + JSON.stringify(message));
        var msg = new Message(JSON.stringify(message));
        client.sendOutputEvent(output, msg, function (err, res) {
            if (err) {
                node.error('Error while trying to send message:' + err.toString());
                setStatus(node, statusEnum.error);
            } else {
                node.log('Message sent.');
                setStatus(node, statusEnum.connected);
            }
        });
    }

    // Registration of the client into Node-RED
    RED.nodes.registerType("edgeclient", IoTEdgeClient, {
        defaults: {
            module: {value: ""}
        }
    });
    
    // Registration of the node into Node-RED
    RED.nodes.registerType("moduletwin", ModuleTwin, {
        defaults: {
            name: { value: "Module Twin" }
        }
    });

    // Registration of the node into Node-RED
    RED.nodes.registerType("moduleinput", ModuleInput, {
        defaults: {
            input: { value: "input1"}
        }
    });

    // Registration of the node into Node-RED
    RED.nodes.registerType("moduleoutput", ModuleOutput, {
        defaults: {
             output: { value: "output1"}
        }
    });

    // Registration of the node into Node-RED
    RED.nodes.registerType("modulemethod", ModuleMethod, {
        defaults: {
            method: { value: "method1"},
            response: { value: "{}"}
        }
    });

}

更详细地说,我们有 2 件相关的事情:

    function ModuleInput(config) {
        // Store node for further use
        var node = this;
        node.input = config.input;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            // Act on module input messages
            node.log("Module Input created: " + node.input);
            client.on('inputMessage', function (inputName, msg) {
                outputMessage(client, node, inputName, msg);
            });
        })
        .catch(function(err){
            node.log("Module Input can't be loaded: " + err);
        });
       
        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

所以,每次inputMessage事件发生时,执行的代码是:

    var outputMessage = function(client, node, inputName, msg) {

        client.complete(msg, function (err) {
            if (err) {
                node.error('Failed sending message to node output:' + err);
                setStatus(node, statusEnum.error);
            }
        });

        if (inputName === node.input){
            setStatus(node, statusEnum.received);
            var message = JSON.parse(msg.getBytes().toString('utf8'));
            if (message) {
                node.log('Processed input message:' + inputName)
                // send to node output
                node.send({payload: message, topic: "input", input: inputName});
            }
            setStatus(node, statusEnum.connected);
        }   
    }

所做的几乎是接收到的消息的 JSON.parse,没有任何更改。显然 Node-RED 会在内部添加标准内容(payload、_msgid 等),但只要属性包含在消息中,就应该转发它们。

我想这是一个大问题,属性是消息本身的一部分吗?如果不是,那就可以解释为什么我在 Node-RED 中看不到它们,而我可以使用 Azure IoT Explorer 看到它们,以防我只是让消息流向 IoT 中心。

问候

azure azure-iot-edge
1个回答
0
投票

要在 Azure IoT Edge 中的模块之间转发消息并包含属性,您需要确保属性与消息负载一起保留和转发。查看您当前的设置和代码,这些属性似乎没有明确包含在转发到 Node-RED 的消息负载中。

enter image description here

要在转发的消息中包含属性,您可以修改

outputMessage
文件中的
azureiotedge.js
函数。您需要从传入消息中提取属性,并将其与有效负载一起包含在将消息转发到 Node-RED 时。

修改

outputMessage
函数以包含属性:

var outputMessage = function(client, node, inputName, msg) {

    client.complete(msg, function (err) {
        if (err) {
            node.error('Failed sending message to node output:' + err);
            setStatus(node, statusEnum.error);
        }
    });

    if (inputName === node.input){
        setStatus(node, statusEnum.received);
        var message = JSON.parse(msg.getBytes().toString('utf8'));
        if (message) {
            // Extract properties from the incoming message
            var properties = msg.properties;
            // Include properties along with the payload
            var forwardedMessage = {
                payload: message,
                properties: properties, // Include properties
                topic: "input",
                input: inputName
            };
            node.log('Processed input message:' + inputName)
            // send to node output
            node.send(forwardedMessage);
        }
        setStatus(node, statusEnum.connected);
    }   
}

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.