使用 Python 将遥测数据从 Azure IoT 中心提取到数字孪生错误

问题描述 投票:0回答:1

我想使用 Python 而不是 C#(我没有 C# 经验)将遥测数据从 IOT 中心发送到 Azure 数字孪生,但我无法获取。

我遵循了 Microsoft 的完整示例 (https://learn.microsoft.com/en-us/azure/digital-twins/how-to-ingest-iot-hub-data) 没有成功。

这是我的模拟设备代码。我检查了一下,它成功地将消息发送到 IoT 中心:

from azure.iot.device import IoTHubDeviceClient
import json
import random
import time

CONNECTION_STRING = "HostName=nmslhub.azure-devices.net;DeviceId=factory110503;SharedAccessKey=okok4Y2Ri3dhsW5h5JSZlijnE2SHyxPA187AOBIMVHA="


def simulate_telemetry(device_client):
    while True:
        temperature = random.randint(20, 30)
        humidity = random.randint(60, 80)

        telemetry = {
            "temperature": temperature,
            "humidity": humidity
        }

        message = json.dumps(telemetry)
        device_client.send_message(message)

        print(f"Sent message: {message}")
        time.sleep(1)

if __name__ == "__main__":
    device_client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
    device_client.connect()

    simulate_telemetry(device_client)

这是使用 python 和事件网格触发器的函数应用程序的代码(我遵循教程链接中的 C# 代码):

import os 
import logging
import json
from azure.identity import DefaultAzureCredential
from azure.digitaltwins.core import DigitalTwinsClient
from azure.core.exceptions import ResourceNotFoundError
import azure.functions as func

adt_instance_url = os.getenv("DIGITAL_TWINS_URL")
if adt_instance_url is None:
    logging.error("Application setting 'DIGITAL_TWINS_URL' not set")

async def main(event: func.EventGridEvent):
    try:
        # Authenticate with Digital Twins
        credential = DefaultAzureCredential()
        client = DigitalTwinsClient(adt_instance_url, credential)
        logging.info("ADT service client connection created.")

        if event is not None and event.data is not None:
            logging.info(event.data)

            # Find device ID and temperature
            device_message = event.get_json()
            device_id = device_message["systemProperties"]["iothub-connection-device-id"]
            temperature = device_message["body"]["temperature"]
            humidity = device_message["body"]["humidity"]
            
            logging.info(f"Device: {device_id} Temperature is: {temperature} Humidity is : {humidity}")

            # Update twin with device temperature
            twin_id = device_id
            patch = [
                {
                    "op": "replace",
                    "path": "/Temperature",
                    "value": temperature
                },
               {
                    "op": "replace",
                    "path": "/Humidity",
                    "value": humidity
                }
            ]
            await client.update_digital_twin(twin_id, patch)
    except ResourceNotFoundError:
        logging.error(f"Digital twin with ID {twin_id} not found.")
    except Exception as ex:
        logging.error(f"Error in ingest function: {ex}")

我还创建了事件订阅来连接 IoT 中心和 Function App。我遵循教程中的所有步骤。

当我在 VS code (Ubuntu 20.04) 上本地运行函数应用程序时,它执行成功,但中间出现此错误:

Error in ingest function: 'NoneType' object has no attribute 'startswith'

我只在整个项目中使用这 2 个文件代码。

我在 Azure 数字孪生资源管理器上运行查询,但没有看到孪生按预期更新。

我应该添加其他内容(代码)还是可以更改什么?我认为问题可能在于缺少数字孪生作为输出或物联网中心作为输入。

azure azure-functions azure-iot-hub azure-python-sdk azure-digital-twins
1个回答
0
投票

我已经使用 Python 代码测试了遥测摄取,这是我已经测试过的有效示例。

1 IoT Hub 数据模拟示例

import os
import random
import time
from datetime import date, datetime
import json
from azure.iot.device import IoTHubDeviceClient, Message

CONNECTION_STRING = "<Device Connection String>"
TEMPERATURE = 35.0
HUMIDITY = 60
MSG_TXT = '{{"Temperature": {Temperature},"Humidity": {Humidity}}}'


def run_telemetry_sample(client):
    print("IoT Hub device sending periodic messages")

    client.connect()

    while True:
        temperature = TEMPERATURE + (random.random() * 15)
        humidity = HUMIDITY + (random.random() * 20)                     
        msg_txt_formatted = MSG_TXT.format(
        Temperature=temperature, Humidity=humidity)
    
        message = Message(msg_txt_formatted, content_encoding="utf-8", content_type="application/json")
        if temperature > 30:
            message.custom_properties["temperatureAlert"] = "true"
        else:
            message.custom_properties["temperatureAlert"] = "false"
    
        print("Sending message: {}".format(message))
        client.send_message(message)
        print("Message successfully sent")
        time.sleep(30)


def main():
    print("IoT Hub Quickstart #1 - Simulated device")
    print("Press Ctrl-C to exit")

    client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)

    try:
        run_telemetry_sample(client)
    except KeyboardInterrupt:
        print("IoTHubClient sample stopped by user")
    finally:
        print("Shutting down IoTHubClient")
        client.shutdown()


if __name__ == '__main__':
    main()

请确保在上述代码中提供正确的 IoT Hub 设备连接字符串

2 Azure Event Grid 触发功能

import json
import logging
import azure.functions as func
import os
from azure.identity import DefaultAzureCredential
from azure.digitaltwins.core import DigitalTwinsClient

adt_instance_url = "<SDT instance URL>"
credential = DefaultAzureCredential()
client = DigitalTwinsClient(adt_instance_url, credential)

async def main(event: func.EventGridEvent):
    
    if not adt_instance_url:
        raise ValueError("Application setting \"ADT_SERVICE_URL\" not set")

    try:
        # Authenticate with Digital Twins

        logging.info("ADT service client connection created.")
        result = json.dumps({
            'id': event.id,
            'data': event.get_json(),
            'topic': event.topic,
            'subject': event.subject,
            'event_type': event.event_type,
        })
        data = json.loads(result)

        if result and data:                

            # <Find_device_ID_and_temperature>
            device_message = data["data"]
            logging.info('Python EventGrid trigger processed an event: %s', device_message)
            device_id = device_message["systemProperties"]["iothub-connection-device-id"]
            temperature = device_message["body"]["Temperature"]
            # </Find_device_ID_and_temperature>

            logging.info(f"Device:{device_id} Temperature is:{temperature}")

            # <Update_twin_with_device_temperature>
            update_twin_data = [{"op": "replace", "path": "/Temperature", "value": temperature}]
            await client.update_digital_twin(device_id, update_twin_data)
            # </Update_twin_with_device_temperature>
    except Exception as ex:
        logging.info("Error in ingest function: %s", str(ex))

请为连接字符串提供正确的 ADT 实例 URL。请注意,我已直接在代码中提供了 URL。但是,建议使用您尝试遵循的方法,并通过环境变量将 URL 传递给代码。

我已将代码发布到 Azure 函数应用程序,并分配了对函数应用程序的“Azure 数字孪生数据所有者”角色访问权限。我还配置了函数应用程序以从 IoT 中心接收遥测事件数据。请参阅分配访问角色将功能连接到 IoT 中心部分,了解执行此操作所需的步骤。

配置完所有内容后,我可以看到接收到云 Azure 功能的事件,并可以更新 Azure 数字孪生。请参考下图。

还请确保您的 Azure 数字孪生已将初始值加载到温度属性,因为我们正在使用

replace
方法补丁来更新孪生。

© www.soinside.com 2019 - 2024. All rights reserved.