我部署了一个带有 2 个模块的 IoT Edge 设备,一个是模拟温度传感器实例,另一个是Node-RED实例(均来自 Marketplace)。我可以将从第一个消息生成的消息转发到 Node-RED 的一个输入,但收到的消息不包含任何属性,系统也不是用户定义的。
为此 IoT Edge 设备配置的路由列表包括 2:
虽然 Node-RED 收到了该消息,但我无权访问属性(iothub-connection-device-id 等)。这是收到的消息:
{
"payload": {
"machine": {
"temperature": 78.45809454307097,
"pressure": 7.545858871995427
},
"ambient": {
"temperature": 21.225943549765738,
"humidity": 26
},
"timeCreated": "2024-05-10T12:40:41.4387644Z"
},
"topic": "input",
"input": "input1",
"_msgid": "ac58dc85f99232e7"
}
按照我在本网站的另一个线程中找到的建议,我在第二条路线的末尾添加了“WITH $includeProperties”,但仍然相同。我还尝试添加“WITH $systemProperties”来至少查看系统的,只是为了测试,但什么也没有。
有什么办法转发吗?在某些情况下我可能需要它们。
为了以防万一,市场中提供的 Node-RED 版本使用 node-red-contrib-azure-iot-edge-module (1.0.4)。我知道有一个替代且更新稍多的版本(node-red-contrib-azure-iot-edge-kpm),但看起来将来不会维护。
所以我进入查看这个模块的源代码,主文件azureiotedge.js看起来像这样:
module.exports = function (RED) {
'use strict'
var Transport = require('azure-iot-device-mqtt').Mqtt;
var Client = require('azure-iot-device').ModuleClient;
var Message = require('azure-iot-device').Message;
var statusEnum = {
disconnected: { color: "red", text: "Disconnected" },
connected: { color: "green", text: "Connected" },
sent: { color: "blue", text: "Sending message" },
received: { color: "yellow", text: "Receiving message" },
reported: { color: "blue", text: "Sending reported properties" },
desired: { color: "yellow", text: "Receiving desired properties" },
method: { color: "yellow", text: "Receiving direct method" },
response: { color: "blue", text: "Sending method response" },
error: { color: "grey", text: "Error" }
};
var edgeClient;
var moduleTwin;
var methodResponses = [];
// Function to create the IoT Edge Client
function IoTEdgeClient(config) {
// Store node for further use
var node = this;
node.connected = false;
// Create the Node-RED node
RED.nodes.createNode(this, config);
// Create the IoT Edge client
Client.fromEnvironment(Transport, function (err, client) {
if (err) {
node.log('Module Client creation error:' + err);
}
else {
client.on('error', function (err) {
node.log('Module Client error:' + err);
});
node.log('Module Client created.');
// connect to the Edge instance
client.open(function (err) {
if (err) {
node.log('Module Client open error:' + err);
throw err;
} else {
node.log('Module Client connected.');
edgeClient = client;
client.getTwin(function(err, twin) {
if (err) {
node.error('Could not get the module twin: ' + err);
throw err;
} else {
node.log('Module twin created.');
node.log('Twin contents:');
node.log(JSON.stringify(twin.properties));
node.on('close', function() {
node.log('Azure IoT Edge Module Client closed.');
edgeClient = null;
moduleTwin = null;
twin.removeAllListeners();
client.removeAllListeners();
client.close();
});
moduleTwin = twin;
}
});
}
});
}
});
}
// Function to create the Module Twin
function ModuleTwin(config) {
// Store node for further use
var node = this;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
// Get the twin
getTwin().then(function(twin){
setStatus(node, statusEnum.connected);
// Register for changes
twin.on('properties.desired', function(delta) {
setStatus(node, statusEnum.desired);
node.log('New desired properties received:');
node.log(JSON.stringify(delta));
node.send({payload: delta, topic: "desired"})
setStatus(node, statusEnum.connected);
});
node.on('input', function (msg) {
setStatus(node, statusEnum.reported);
var messageJSON = null;
if (typeof (msg.payload) != "string") {
messageJSON = msg.payload;
} else {
//Converting string to JSON Object
messageJSON = JSON.parse(msg.payload);
}
twin.properties.reported.update(messageJSON, function(err) {
if (err) throw err;
node.log('Twin state reported.');
setStatus(node, statusEnum.connected);
});
});
})
.catch(function(err){
node.log('Module Twin error:' + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Module input to receive input from edgeHub
function ModuleInput(config) {
// Store node for further use
var node = this;
node.input = config.input;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
// Act on module input messages
node.log("Module Input created: " + node.input);
client.on('inputMessage', function (inputName, msg) {
outputMessage(client, node, inputName, msg);
});
})
.catch(function(err){
node.log("Module Input can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Module output to send output to edgeHub
function ModuleOutput(config) {
// Store node for further use
var node = this;
node.output = config.output;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
// React on input from node-red
node.log("Module Output created: " + node.output);
node.on('input', function (msg) {
setStatus(node, statusEnum.sent);
var messageJSON = null;
if (typeof (msg.payload) != "string") {
messageJSON = msg.payload;
} else {
//Converting string to JSON Object
messageJSON = JSON.parse(msg.payload);
}
var messageOutput = node.output;
sendMessageToEdgeHub(client, node, messageJSON, messageOutput);
});
})
.catch(function(err){
node.error("Module Output can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Module method to receive methods from IoT Hub
function ModuleMethod(config) {
// Store node for further use
var node = this;
node.method = config.method;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
var mthd = node.method;
node.log('Direct Method created: ' + mthd);
client.onMethod(mthd, function(request, response) {
// Set status
setStatus(node, statusEnum.method);
node.log('Direct Method called: ' + request.methodName);
if(request.payload) {
node.log('Method Payload:' + JSON.stringify(request.payload));
node.send({payload: request.payload,topic: "method", method: request.methodName});
}
else {
node.send({payload: null,topic: "method", method: request.methodName});
}
getResponse(node).then(function(rspns){
var responseBody;
if (typeof (rspns.response) != "string") {
// Turn message object into string
responseBody = JSON.stringify(rspns.response);
} else {
responseBody = rspns.response;
}
response.send(rspns.status, responseBody, function(err) {
if (err) {
node.log('Failed sending method response: ' + err);
} else {
node.log('Successfully sent method response.');
}
});
})
.catch(function(err){
node.error("Failed sending method response: response not received.");
});
// reset response
node.response = null;
setStatus(node, statusEnum.connected);
});
// Set method response on input
node.on('input', function (msg) {
var method = node.method;
methodResponses.push(
{method: method, response: msg.payload, status: msg.status}
);
node.log("Module Method response set through node input: " + JSON.stringify(methodResponses.find(function(m){return m.method === method})));
});
})
.catch(function(err){
node.error("Module Method can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Get module client using promise, and retry, and slow backoff
function getClient(){
var retries = 20;
var timeOut = 1000;
// Retrieve client using progressive promise to wait for module client to be opened
var promise = Promise.reject();
for(var i=1; i <= retries; i++) {
promise = promise.catch( function(){
if (edgeClient){
return edgeClient;
}
else {
throw new Error("Module Client not initiated..");
}
})
.catch(function rejectDelay(reason) {
retries++;
return new Promise(function(resolve, reject) {
setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
});
});
}
return promise;
}
// Get module twin using promise, and retry, and slow backoff
function getTwin(){
var retries = 10;
var timeOut = 1000;
// Retrieve twin using progressive promise to wait for module twin to be opened
var promise = Promise.reject();
for(var i=1; i <= retries; i++) {
promise = promise.catch( function(){
if (moduleTwin){
return moduleTwin;
}
else {
throw new Error("Module Client not initiated..");
}
})
.catch(function rejectDelay(reason) {
return new Promise(function(resolve, reject) {
setTimeout(reject.bind(null, reason), timeOut * i);
});
});
}
return promise;
}
// Get module method response using promise, and retry, and slow backoff
function getResponse(node){
var retries = 20;
var timeOut = 1000;
var m = {};
node.log("Module Method node method: " + node.method);
// Retrieve client using progressive promise to wait for module client to be opened
var promise = Promise.reject();
for(var i=1; i <= retries; i++) {
promise = promise.catch( function(){
var methodResponse = methodResponses.find(function(m){return m.method === node.method});
if (methodResponse){
// get the response and clean the array
var response = methodResponse;
node.log("Module Method response object found: " + JSON.stringify(response));
methodResponses.splice(methodResponses.findIndex(function(m){return m.method === node.method}),1);
return response;
}
else {
throw new Error("Module Method Response not initiated..");
}
})
.catch(function rejectDelay(reason) {
retries++;
return new Promise(function(resolve, reject) {
setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
});
});
}
return promise;
}
// This function just sends the incoming message to the node output adding the topic "input" and the input name.
var outputMessage = function(client, node, inputName, msg) {
client.complete(msg, function (err) {
if (err) {
node.error('Failed sending message to node output:' + err);
setStatus(node, statusEnum.error);
}
});
if (inputName === node.input){
setStatus(node, statusEnum.received);
var message = JSON.parse(msg.getBytes().toString('utf8'));
if (message) {
node.log('Processed input message:' + inputName)
// send to node output
node.send({payload: message, topic: "input", input: inputName});
}
setStatus(node, statusEnum.connected);
}
}
var setStatus = function (node, status) {
node.status({ fill: status.color, shape: "dot", text: status.text });
}
var sendMessageToEdgeHub = function (client, node, message, output) {
// Send the message to IoT Edge
if (!output)
{
output = "output";
}
node.log('Sending Message to Azure IoT Edge: ' + output + '\n Payload: ' + JSON.stringify(message));
var msg = new Message(JSON.stringify(message));
client.sendOutputEvent(output, msg, function (err, res) {
if (err) {
node.error('Error while trying to send message:' + err.toString());
setStatus(node, statusEnum.error);
} else {
node.log('Message sent.');
setStatus(node, statusEnum.connected);
}
});
}
// Registration of the client into Node-RED
RED.nodes.registerType("edgeclient", IoTEdgeClient, {
defaults: {
module: {value: ""}
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("moduletwin", ModuleTwin, {
defaults: {
name: { value: "Module Twin" }
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("moduleinput", ModuleInput, {
defaults: {
input: { value: "input1"}
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("moduleoutput", ModuleOutput, {
defaults: {
output: { value: "output1"}
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("modulemethod", ModuleMethod, {
defaults: {
method: { value: "method1"},
response: { value: "{}"}
}
});
}
更详细地说,我们有 2 件相关的事情:
function ModuleInput(config) {
// Store node for further use
var node = this;
node.input = config.input;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
// Act on module input messages
node.log("Module Input created: " + node.input);
client.on('inputMessage', function (inputName, msg) {
outputMessage(client, node, inputName, msg);
});
})
.catch(function(err){
node.log("Module Input can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
所以,每次inputMessage事件发生时,执行的代码是:
var outputMessage = function(client, node, inputName, msg) {
client.complete(msg, function (err) {
if (err) {
node.error('Failed sending message to node output:' + err);
setStatus(node, statusEnum.error);
}
});
if (inputName === node.input){
setStatus(node, statusEnum.received);
var message = JSON.parse(msg.getBytes().toString('utf8'));
if (message) {
node.log('Processed input message:' + inputName)
// send to node output
node.send({payload: message, topic: "input", input: inputName});
}
setStatus(node, statusEnum.connected);
}
}
所做的几乎是接收到的消息的 JSON.parse,没有任何更改。显然 Node-RED 会在内部添加标准内容(payload、_msgid 等),但只要属性包含在消息中,就应该转发它们。
我想这是一个大问题,属性是消息本身的一部分吗?如果不是,那就可以解释为什么我在 Node-RED 中看不到它们,而我可以使用 Azure IoT Explorer 看到它们,以防我只是让消息流向 IoT 中心。
问候
要在 Azure IoT Edge 中的模块之间转发消息并包含属性,您需要确保属性与消息负载一起保留和转发。查看您当前的设置和代码,这些属性似乎没有明确包含在转发到 Node-RED 的消息负载中。
要在转发的消息中包含属性,您可以修改
outputMessage
文件中的 azureiotedge.js
函数。您需要从传入消息中提取属性,并将其与有效负载一起包含在将消息转发到 Node-RED 时。
修改
outputMessage
函数以包含属性:
var outputMessage = function(client, node, inputName, msg) {
client.complete(msg, function (err) {
if (err) {
node.error('Failed sending message to node output:' + err);
setStatus(node, statusEnum.error);
}
});
if (inputName === node.input){
setStatus(node, statusEnum.received);
var message = JSON.parse(msg.getBytes().toString('utf8'));
if (message) {
// Extract properties from the incoming message
var properties = msg.properties;
// Include properties along with the payload
var forwardedMessage = {
payload: message,
properties: properties, // Include properties
topic: "input",
input: inputName
};
node.log('Processed input message:' + inputName)
// send to node output
node.send(forwardedMessage);
}
setStatus(node, statusEnum.connected);
}
}
输出: