在过去的 2 天里,我试图从 Visual Code Studio 本地开发和部署我的 http 触发器,我能够做到这一点,但突然我的 http 触发器功能消失在 azure 门户中,并且当我尝试时在 VS code 工作区中也不可见开始调试。我收到以下错误。
功能代码:
import azure.functions as func
from azure.storage.blob import BlobServiceClient
from azure.iot.device import IoTHubDeviceClient, Message
import crc32c
import base64
def main(req: func.HttpRequest) -> func.HttpResponse:
# Retrieve the blob name from the HTTP request
blob_name = req.params.get('blob_name')
if not blob_name:
return func.HttpResponse(
"Please provide a 'blob_name' parameter in the query string.",
status_code=400
)
# Connect to Azure Blob Storage
blob_service_client = BlobServiceClient.from_connection_string("blob connection string")
blob_client = blob_service_client.get_blob_client(container="blob-device", blob=blob_name)
try:
# Download the blob
blob_data = blob_client.download_blob()
binary_data = blob_data.readall()
# Send the binary file to the device (pseudo-code)
send_to_device(binary_data)
return func.HttpResponse("Binary file sent to the device successfully.", status_code=200)
except Exception as e:
return func.HttpResponse(f"An error occurred: {str(e)}", status_code=500)
def send_to_device(binary_data):
# Calculate CRC32 checksum of the binary data
crc32_checksum = crc32c.crc32(binary_data)
# Convert binary data to base64 for transmission
# encoded_data = base64.b64encode(binary_data)
# Connect to IoT Hub
conn_str = "Iot Hub connect string"
device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
try:
# Connect to the IoT Hub
device_client.connect()
# Send the binary file along with CRC32 checksum as a message to IoT Hub
msg = {
"file": binary_data,
"checksum": crc32_checksum
}
device_client.send_message(msg)
print("Binary file sent to IoT Hub successfully.")
# Receive acknowledgment or any other messages from IoT Hub
while True:
message = device_client.receive_message()
if message:
print("Received message from IoT Hub:")
print(message.data)
# Handle the received message here
# Example: If the message contains acknowledgment or status
# handle_message(message)
else:
print("No more messages from IoT Hub.")
break
except Exception as e:
print(f"An error occurred while sending data to IoT Hub: {e}")
finally:
# Disconnect from IoT Hub
device_client.disconnect()
我使用了下面的代码,它对我有用。
import azure.functions as func
import logging
from azure.storage.blob import BlobServiceClient
from azure.iot.device import IoTHubDeviceClient, Message
import crc32c
import base64
import os
import json
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
blob_name = req.params.get('blob_name')
if not blob_name:
return func.HttpResponse(
"Please provide a 'blob_name' parameter in the query string.",
status_code=400
)
else:
# Connect to Azure Blob Storage
blob_connect_str = os.environ["StorageAccountConnectionString"]
blob_service_client = BlobServiceClient.from_connection_string(blob_connect_str)
blob_client = blob_service_client.get_blob_client(container="demo-container", blob=blob_name)
try:
# Download the blob
blob_data = blob_client.download_blob()
binary_data = blob_data.readall()
# Send the binary file to the device (pseudo-code)
send_to_device(binary_data)
return func.HttpResponse("Binary file sent to the device successfully.", status_code=200)
except Exception as e:
return func.HttpResponse(f"An error occurred: {str(e)}", status_code=500)
def send_to_device(binary_data):
# Calculate CRC32 checksum of the binary data
crc32_checksum = crc32c.crc32(binary_data)
# Connect to IoT Hub
iot_conn_str = os.environ["IotHubConnectionString"]
device_client = IoTHubDeviceClient.create_from_connection_string(iot_conn_str)
try:
# Connect to the IoT Hub
device_client.connect()
# Send the binary file along with CRC32 checksum as a message to IoT Hub
msg_body = {
"file": base64.b64encode(binary_data).decode(),
"checksum": crc32_checksum
}
msg = Message(json.dumps(msg_body))
device_client.send_message(msg)
print("Binary file sent to IoT Hub successfully.")
# Receive acknowledgment or any other messages from IoT Hub
while True:
message = device_client.receive_message()
if message is None:
continue
print("Received message from IoT Hub:")
print(message.data)
break
except Exception as e:
print(f"An error occurred while sending data to IoT Hub: {e}")
finally:
# Disconnect from IoT Hub
device_client.disconnect()
我已在本地设置文件中声明了连接字符串。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"StorageAccountConnectionString": "DefaultEndpointsProtocol=https;AccountName=afrinstore01;AccountKey=cRWkgd************==;EndpointSuffix=core.windows.net",
"IotHubConnectionString": "HostName={HostName}.azure-devices.net;DeviceId={deviceId};SharedAccessKey=3Rj*********k="
}
}
需求.txt-
azure-functions
azure-storage-blob
azure-iot-device
crc32c
我能够将消息发送到 Azure Iot Hub。