使用 python SDK 创建 Azure 流分析作业会出现错误
azure.core.exceptions.HttpResponseError: (BadRequest)
。请求正文中提供的 JSON 无效。请求中缺少必需的属性“数据源类型”。
这是我用来创建 ASA 作业的 API 调用:
response = client.streaming_jobs.begin_create_or_replace(resource_group_name, job_name123, streaming_job={
"location": "East US",
"properties": {
"sku": {
"name": "standard"
},
"eventsLateArrivalMaxDelayInSeconds": 1,
"jobType": "edge",
"inputs": [
{
"name": "input",
"properties": {
"type": "stream",
"serialization": {
"type": "JSON",
"properties": {
"fieldDelimiter": ",",
"encoding": "UTF8"
}
},
"datasource": {
"type": "GatewayMessageBus",
"properties": {
}
}
}
}
],
"transformation": {
"name": "samplequery",
"properties": {
"query": "select * from input"
}
},
"package": {
"storageAccount" : {
"accountName": "*******",
"accountKey": "*******"
},
"container": "sample"
},
"outputs": [
{
"name": "output",
"properties": {
"serialization": {
"type": "JSON",
"properties": {
"fieldDelimiter": ",",
"encoding": "UTF8"
}
},
"datasource": {
"type": "GatewayMessageBus",
"properties": {
}
}
}
}
]
}
})
使用 Python SDK 创建 Azure 流分析作业会出现错误
该错误表明您的代码没有正确添加数据源属性。
您可以使用以下代码使用 azure python SDK 创建流分析作业。
代码:
from azure.identity import DefaultAzureCredential
from azure.mgmt.streamanalytics import StreamAnalyticsManagementClient
# Replace with your Azure subscription ID and resource group name
subscription_id = "xxxxx"
resource_group_name = "xxxxxx"
# Replace with your specific Stream Analytics job name
job_name = "stream326"
# Set up credentials and Stream Analytics client
credential = DefaultAzureCredential()
client = StreamAnalyticsManagementClient(credential, subscription_id)
response = client.streaming_jobs.begin_create_or_replace(resource_group_name, job_name, streaming_job={
"location": "East US",
"properties": {
"sku": {
"name": "Standard"
},
"eventsOutOfOrderPolicy": "Drop",
"outputErrorPolicy": "Drop",
"eventsOutOfOrderMaxDelayInSeconds": 0,
"eventsLateArrivalMaxDelayInSeconds": 5,
"dataLocale": "en-US",
"compatibilityLevel": "1.0",
"inputs": [
{
"properties": {
"type": "Stream",
"datasource": {
"type": "Microsoft.Devices/IotHubs",
"properties": {
"iotHubNamespace": "iothub-name",
"sharedAccessPolicyName": "iothubowner",
"sharedAccessPolicyKey": "key",
"endpoint": "messages/events",
"consumerGroupName": "$Default"
}
},
"serialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
},
"name": "inputtest"
}
],
"transformation": {
"properties": {
"streamingUnits": 1,
"query": "SELECT *INTO outputtest FROM inputtest WHERE Temperature > 27"
},
"name": "transformationtest"
},
"outputs": [
{
"properties": {
"datasource": {
"type": "Microsoft.Storage/Blob",
"properties": {
"storageAccounts": [
{
"accountName": "your-storage-account-name",
"accountKey": "your-account-key"
}
],
"container": "test",
"pathPattern": "",
"dateFormat": "yyyy/MM/dd",
"timeFormat": "HH"
}
},
"serialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
},
"name": "outputtest"
}
],
"functions": []
},
"tags": {
"key1": "value1",
"randomKey": "randomValue",
"key3": "value3"
}
})
print(response.status())
输出:
Succeeded
传送门:
当我开始任务时,它反映了我的存储帐户中的数据。
传送门:
参考: