我在尝试执行一个非常简单的 python 脚本时遇到问题,该脚本在 Windows 中完美运行,但在我的 Le Potato (Debian) 上却无法运行:
import uuid
from azure.iot.hub import IoTHubRegistryManager
import MqttClient as mqttClient
CONNECTION_STRING = "HostName=(...)"
DEVICE_ID = "(...)"
registry_manager = IoTHubRegistryManager(CONNECTION_STRING)
def send_message(msg):
props = {}
props.update(messageId = str(uuid.uuid4()))
props.update(contentType = "application/json")
props.update()
registry_manager.send_c2d_message(DEVICE_ID, msg, properties=props)
def on_message(mqttc, obj, msg):
print("[" + msg.topic+"]: "+str(msg.payload))
try:
send_message(str(msg.payload))
except Exception as e:
print("An exception occurred:", str(e))
mqttClient.init(on_message=on_message)
运行时
python myscript.py
我收到错误 ModuleNotFoundError: No module named 'azure.iot.hub'
我把所有东西都安装了好几次:
pip install azure-iot-hub
pip install azure-iot-device
我什至通过全新安装创建了自己的虚拟环境,并在 Windows 中尝试了相同的版本。
使用此参考MSDOC能够发送云到设备消息。
pip install azure-iot-hub
、pip3 install azure-iot-hub
和pip3 install uuid
pip list
我们可以检查相应的模块是否已安装。代码:
import random
import uuid
from azure.iot.hub import IoTHubRegistryManager
CONNECTION_STRING = "HostName=sampath23sa454.azure-devices.net;SharedAccessKeyName=iot"
DEVICE_ID = "sampath23"
registry_manager = IoTHubRegistryManager(CONNECTION_STRING)
def send_message(msg):
props = {
"messageId": str(uuid.uuid4()),
"contentType": "application/json"
}
registry_manager.send_c2d_message(DEVICE_ID, msg, properties=props)
print("Message sent successfully!")
print("Message content:", msg)
def on_message(mqttc, obj, msg):
print("[" + msg.topic + "]: " + str(msg.payload))
try:
send_message(str(msg.payload))
except Exception as e:
print("An exception occurred:", str(e))
class MqttClient:
@staticmethod
def init(on_message):
pass
mqttClient = MqttClient()
mqttClient.init(on_message=on_message)
if __name__ == '__main__':
print("Starting the IoT Hub C2D Messaging sample...")
message_to_send = "Hello from the cloud!"
send_message(message_to_send)
input("Press Enter to stop...\n")
print("IoT Hub C2D Messaging sample stopped")
输出:
代码:
mqttClient = MqttClient()
mqttClient.init(on_message=on_message)
if __name__ == '__main__':
print("Starting the IoT Hub C2D Messaging sample...")
random_messages = ["Temperature: 25°C", "Humidity: 50%", "Pressure: 1013 hPa"]
for _ in range(5): # Sending 5 random messages
random_message = random.choice(random_messages)
send_message(random_message)
input("Press Enter to stop...\n")
print("IoT Hub C2D Messaging sample stopped")
输出: