流分析-如何在参考输入中处理json

问题描述 投票:0回答:1

我有一个Azure流分析(ASA)作业,该作业处理来自事件中心的设备遥测数据。该流应与sql表中的参考数据结合在一起,以使用其他设备元数据来增强每条消息。合并的条目应存储在CosmosDb中。

用于服务设备元数据的sql数据库:

CREATE TABLE [dbo].[MyTable]
(
  [DeviceId] NVARCHAR(20) NOT NULL PRIMARY KEY, 
  [MetaData] NVARCHAR(MAX) NULL   /* this stores json, which can vary per record */
)

在ASA中,我已经通过简单的查询配置了参考数据输入:

SELECT DeviceId, JSON_QUERY(MetaData) FROM [dbo].[MyTable]

而且我有执行连接的主要ASA查询:

WITH temptable AS (
SELECT * FROM [telemetry-input] TD PARTITION BY PartitionId
LEFT OUTER JOIN [metadata-input] MD
ON TD.DeviceId = MD.DeviceId
)

SELECT TD.*, MD.MetaData 
INTO [cosmos-db-output] 
FROM temptable PARTITION BY PartitionId

所有工作和合并的数据都存储在CosmosDb中。但是,来自sql的Metadata列的值被视为字符串,并以引号和转义字符存储在comos中。例如:

{ "DeviceId" : "abc1234", … , "MetaData" : "{ \"TestKey\": \"test value\" }" };

是否有一种方法可以将Metadata中的json处理并存储为适当的Json对象,即

{ "DeviceId" : "abc1234", … , "MetaData" : { "TestKey": "test value" } };
azure-sql-database azure-cosmosdb azure-stream-analytics stream-analytics
1个回答
0
投票

正如您在问题中提到的,参考json数据被视为json字符串,而不是json对象。根据我对ASA中Query Syntax的研究,没有内置函数可以将其转换。

但是,建议您使用Azure Function Cosmos DB Trigger处理创建的每个文档。请参考我的功能代码:

using System;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json.Linq;

namespace ProcessJson
{
    public class Class1
    {
        [FunctionName("DocumentUpdates")]
        public static void Run(
        [CosmosDBTrigger(databaseName:"db",collectionName: "item", ConnectionStringSetting = "CosmosDBConnection",LeaseCollectionName = "leases",
            CreateLeaseCollectionIfNotExists = true)]
        IReadOnlyList<Document> documents,
        TraceWriter log)
        {
            log.Verbose("Start.........");
            String endpointUrl = "https://***.documents.azure.com:443/";
            String authorizationKey = "***";
            String databaseId = "db";
            String collectionId = "import";

            DocumentClient client = new DocumentClient(new Uri(endpointUrl), authorizationKey);

            for (int i = 0; i < documents.Count; i++)
            {
                Document doc = documents[i];
                if((doc.alreadyFormat == Undefined.Value) ||(!doc.alreadyFormat)){
                   String MetaData = doc.GetPropertyValue<String>("MetaData");
                   JObject o = JObject.Parse(MetaData);

                   doc.SetPropertyValue("MetaData", o);
                   doc.SetPropertyValue("alreadyFormat", true);
                   client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(databaseId, collectionId, doc.Id), doc); 

                   log.Verbose("Update document Id " + doc.Id);

                }

            }
        }
    }
}

此外,请参考情况:Azure Cosmos DB SQL - how to unescape inner json property

希望它对您有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.