Azure 数据工厂数据流活动

问题描述 投票:0回答:1

我想将日志分析表 A 数据复制到同一日志分析工作区中的表 B。 为了将分层数据从 Azure 数据工厂中的源映射到目标,我使用了数据流。 我的配置。

我的来源回复是:

`[
  {
    "tables": [
      {
        "name": "PrimaryResult",
        "columns": [
          {
            "name": "TimeGenerated",
            "type": "datetime"
          },
          {
            "name": "day_of_occurrence",
            "type": "datetime"
          },
          {
            "name": "request_id",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "url",
            "type": "string"
          },
          {
            "name": "item_type",
            "type": "string"
          },
          {
            "name": "cloud_role_name",
            "type": "string"
          },
          {
            "name": "P995_duration",
            "type": "real"
          },
          {
            "name": "P99_duration",
            "type": "real"
          },
          {
            "name": "P95_duration",
            "type": "real"
          },
          {
            "name": "P90_duration",
            "type": "real"
          },
          {
            "name": "P75_duration",
            "type": "real"
          },
          {
            "name": "P50_duration",
            "type": "real"
          },
          {
            "name": "item_count",
            "type": "int"
          }
        ],
        "rows": [
          [
            "2024-05-06T00:00:00Z",
            "2024-05-06T00:00:00Z",
            "",
            "GET /api/version",
            "https://api.vgdev.glint.cloud.dev.microsoft/api/version",
            "",
            "apiserver",
            5.8315,
            5.234724489795918,
            3.174455248506027,
            2.5453802347746817,
            1.2622889021938888,
            1.1403031329836708,
            18762
          ]
        ]
      }
    ]
  }
]
`

但是 LOG ANALYTICS sestination 接受这种格式的 jsob 主体

`[
{
"TimeGenerated":"2024-05-06T03:59:02.9499434Z",
"day_of_occurrence":"2024-05-06T03:59:02.9499434Z",
"request_id":"dfasdsad",
"name":"gfhgfh",
"url":"gfhgf",
"item_type":"",
"cloud_role_name":"fvxccx",
"P995_duration":2,
"P99_duration":1,
"P95_duration":6,
"P90_duration":8,
"P75_duration":9.1,
"P50_duration":50,
"item_count":11
},
{
"TimeGenerated":"2024-05-06T03:59:02.9499434Z",
"day_of_occurrence":"2024-05-06T03:59:02.9499434Z",
"request_id":"xcxzzz",
"name":"ewqe",
"url":"bfvbvb",
"item_type":"",
"cloud_role_name":"gjghj",
"P995_duration":2,
"P99_duration":1,
"P95_duration":4,
"P90_duration":54,
"P75_duration":8.1,
"P50_duration":5,
"item_count":13
}

]`

如何从来源地转变为目的地?

我无法实现转变?

azure-data-factory
1个回答
0
投票

为了实现您的要求,您可以按照以下步骤进行操作。但请注意,以下方法仅适用于这种情况,并且您需要手动添加数据类型,因为在 Dataflow 中可能无法从输入 JSON 动态指定数据类型。

在数据流源中获取 JSON 后,进行 Flatten 转换并首先展平

tables
数组,然后仅从中选择必需的字段
rows
columns

enter image description here

在此之后,进行第二次 flatten 变换并展平

rows
数组。

enter image description here

现在,进行 driven 列转换并使用以下表达式创建一个

new
列。

replace(concat('{',replace(replace(replace(toString(mapIndex(rows,concat("'",columns[#index].name,"':",iif(in(['string','datetime'],columns[#index].type),concat("'",#item,"'"),#item)))),'[',''),']',''),'"',''),'}'),"'",'"')

enter image description here

这使用

rows
columns
数组并为每行生成所需的 JSON 对象字符串,如下所示。

enter image description here

在此之后使用 select 转换并仅选择

new
列。

现在,使用 Parse 转换从上面的 JSON 字符串中获取 JSON 对象。

在此,给出

new
列,您需要给出输出数据类型。

(TimeGenerated as string,
        day_of_occurrence as string,
        request_id as string,
        name as string,
        url as string,
        item_type as string,
        cloud_role_name as string,
        P995_duration as double,
        P99_duration as double,
        P95_duration as double,
        P90_duration as double,
        P75_duration as double,
        P50_duration as double,
        item_count as integer)

您可以根据您的要求更改数据类型。在这里,对于示例,我给出了

double
数据类型,您也可以将其更改为
float

enter image description here

此后,将使用给定的数据类型创建所需的列。

enter image description here

要获取各个列,请使用另一个 select 转换和基于规则的映射。

enter image description here

这将生成所需的输出列。

enter image description here

您可以根据您的要求使用派生列转换从此处更改小数。将您的目标作为接收器,数据将被复制。

这是数据流JSON直到选择转换供您参考:

{
    "name": "Logs_dataflow",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "logs_source_json",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [],
            "transformations": [
                {
                    "name": "flatten1"
                },
                {
                    "name": "flatten2"
                },
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "select2"
                },
                {
                    "name": "parse1"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          tables as (name as string, columns as (name as string, type as string)[], rows as string[][])[]",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'arrayOfDocuments') ~> source1",
                "source1 foldDown(unroll(tables, tables),",
                "     mapColumn(",
                "          columns = tables.columns,",
                "          rows = tables.rows",
                "     ),",
                "     skipDuplicateMapInputs: false,",
                "     skipDuplicateMapOutputs: false) ~> flatten1",
                "flatten1 foldDown(unroll(rows, rows),",
                "     mapColumn(",
                "          rows,",
                "          columns",
                "     ),",
                "     skipDuplicateMapInputs: false,",
                "     skipDuplicateMapOutputs: false) ~> flatten2",
                "flatten2 derive(new = replace(concat('{',replace(replace(replace(toString(mapIndex(rows,concat(\"'\",columns[#index].name,\"':\",iif(in(['string','datetime'],columns[#index].type),concat(\"'\",#item,\"'\"),#item)))),'[',''),']',''),'\"',''),'}'),\"'\",'\"')) ~> derivedColumn1",
                "derivedColumn1 select(mapColumn(",
                "          new",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "parse1 select(mapColumn(",
                "          each(new,match(true()))",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select2",
                "select1 parse(new = new ? (TimeGenerated as string,",
                "          day_of_occurrence as string,",
                "          request_id as string,",
                "          name as string,",
                "          url as string,",
                "          item_type as string,",
                "          cloud_role_name as string,",
                "          P995_duration as double,",
                "          P99_duration as double,",
                "          P95_duration as double,",
                "          P90_duration as double,",
                "          P75_duration as double,",
                "          P50_duration as double,",
                "          item_count as integer),",
                "     format: 'json',",
                "     documentForm: 'singleDocument') ~> parse1"
            ]
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.