我想使用 Python 而不是 C#(我没有 C# 经验)将遥测数据从 IOT 中心发送到 Azure 数字孪生,但我无法获取。
我遵循了 Microsoft 的完整示例 (https://learn.microsoft.com/en-us/azure/digital-twins/how-to-ingest-iot-hub-data) 没有成功。
这是我的模拟设备代码。我检查了一下,它成功地将消息发送到 IoT 中心:
from azure.iot.device import IoTHubDeviceClient
import json
import random
import time
CONNECTION_STRING = "HostName=nmslhub.azure-devices.net;DeviceId=factory110503;SharedAccessKey=okok4Y2Ri3dhsW5h5JSZlijnE2SHyxPA187AOBIMVHA="
def simulate_telemetry(device_client):
while True:
temperature = random.randint(20, 30)
humidity = random.randint(60, 80)
telemetry = {
"temperature": temperature,
"humidity": humidity
}
message = json.dumps(telemetry)
device_client.send_message(message)
print(f"Sent message: {message}")
time.sleep(1)
if __name__ == "__main__":
device_client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
device_client.connect()
simulate_telemetry(device_client)
这是使用 python 和事件网格触发器的函数应用程序的代码(我遵循教程链接中的 C# 代码):
import os
import logging
import json
from azure.identity import DefaultAzureCredential
from azure.digitaltwins.core import DigitalTwinsClient
from azure.core.exceptions import ResourceNotFoundError
import azure.functions as func
adt_instance_url = os.getenv("DIGITAL_TWINS_URL")
if adt_instance_url is None:
logging.error("Application setting 'DIGITAL_TWINS_URL' not set")
async def main(event: func.EventGridEvent):
try:
# Authenticate with Digital Twins
credential = DefaultAzureCredential()
client = DigitalTwinsClient(adt_instance_url, credential)
logging.info("ADT service client connection created.")
if event is not None and event.data is not None:
logging.info(event.data)
# Find device ID and temperature
device_message = event.get_json()
device_id = device_message["systemProperties"]["iothub-connection-device-id"]
temperature = device_message["body"]["temperature"]
humidity = device_message["body"]["humidity"]
logging.info(f"Device: {device_id} Temperature is: {temperature} Humidity is : {humidity}")
# Update twin with device temperature
twin_id = device_id
patch = [
{
"op": "replace",
"path": "/Temperature",
"value": temperature
},
{
"op": "replace",
"path": "/Humidity",
"value": humidity
}
]
await client.update_digital_twin(twin_id, patch)
except ResourceNotFoundError:
logging.error(f"Digital twin with ID {twin_id} not found.")
except Exception as ex:
logging.error(f"Error in ingest function: {ex}")
我还创建了事件订阅来连接 IoT 中心和 Function App。我遵循教程中的所有步骤。
当我在 VS code (Ubuntu 20.04) 上本地运行函数应用程序时,它执行成功,但中间出现此错误:
Error in ingest function: 'NoneType' object has no attribute 'startswith'
我只在整个项目中使用这 2 个文件代码。
我在 Azure 数字孪生资源管理器上运行查询,但没有看到孪生按预期更新。
我应该添加其他内容(代码)还是可以更改什么?我认为问题可能在于缺少数字孪生作为输出或物联网中心作为输入。
我已经使用 Python 代码测试了遥测摄取,这是我已经测试过的有效示例。
1 IoT Hub 数据模拟示例
import os
import random
import time
from datetime import date, datetime
import json
from azure.iot.device import IoTHubDeviceClient, Message
CONNECTION_STRING = "<Device Connection String>"
TEMPERATURE = 35.0
HUMIDITY = 60
MSG_TXT = '{{"Temperature": {Temperature},"Humidity": {Humidity}}}'
def run_telemetry_sample(client):
print("IoT Hub device sending periodic messages")
client.connect()
while True:
temperature = TEMPERATURE + (random.random() * 15)
humidity = HUMIDITY + (random.random() * 20)
msg_txt_formatted = MSG_TXT.format(
Temperature=temperature, Humidity=humidity)
message = Message(msg_txt_formatted, content_encoding="utf-8", content_type="application/json")
if temperature > 30:
message.custom_properties["temperatureAlert"] = "true"
else:
message.custom_properties["temperatureAlert"] = "false"
print("Sending message: {}".format(message))
client.send_message(message)
print("Message successfully sent")
time.sleep(30)
def main():
print("IoT Hub Quickstart #1 - Simulated device")
print("Press Ctrl-C to exit")
client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
try:
run_telemetry_sample(client)
except KeyboardInterrupt:
print("IoTHubClient sample stopped by user")
finally:
print("Shutting down IoTHubClient")
client.shutdown()
if __name__ == '__main__':
main()
请确保在上述代码中提供正确的 IoT Hub 设备连接字符串
2 Azure Event Grid 触发功能
import json
import logging
import azure.functions as func
import os
from azure.identity import DefaultAzureCredential
from azure.digitaltwins.core import DigitalTwinsClient
adt_instance_url = "<SDT instance URL>"
credential = DefaultAzureCredential()
client = DigitalTwinsClient(adt_instance_url, credential)
async def main(event: func.EventGridEvent):
if not adt_instance_url:
raise ValueError("Application setting \"ADT_SERVICE_URL\" not set")
try:
# Authenticate with Digital Twins
logging.info("ADT service client connection created.")
result = json.dumps({
'id': event.id,
'data': event.get_json(),
'topic': event.topic,
'subject': event.subject,
'event_type': event.event_type,
})
data = json.loads(result)
if result and data:
# <Find_device_ID_and_temperature>
device_message = data["data"]
logging.info('Python EventGrid trigger processed an event: %s', device_message)
device_id = device_message["systemProperties"]["iothub-connection-device-id"]
temperature = device_message["body"]["Temperature"]
# </Find_device_ID_and_temperature>
logging.info(f"Device:{device_id} Temperature is:{temperature}")
# <Update_twin_with_device_temperature>
update_twin_data = [{"op": "replace", "path": "/Temperature", "value": temperature}]
await client.update_digital_twin(device_id, update_twin_data)
# </Update_twin_with_device_temperature>
except Exception as ex:
logging.info("Error in ingest function: %s", str(ex))
请为连接字符串提供正确的 ADT 实例 URL。请注意,我已直接在代码中提供了 URL。但是,建议使用您尝试遵循的方法,并通过环境变量将 URL 传递给代码。
我已将代码发布到 Azure 函数应用程序,并分配了对函数应用程序的“Azure 数字孪生数据所有者”角色访问权限。我还配置了函数应用程序以从 IoT 中心接收遥测事件数据。请参阅分配访问角色和将功能连接到 IoT 中心部分,了解执行此操作所需的步骤。
配置完所有内容后,我可以看到接收到云 Azure 功能的事件,并可以更新 Azure 数字孪生。请参考下图。
还请确保您的 Azure 数字孪生已将初始值加载到温度属性,因为我们正在使用
replace
方法补丁来更新孪生。