我想将日志分析表 A 数据复制到同一日志分析工作区中的表 B。 为了将分层数据从 Azure 数据工厂中的源映射到目标,我使用了数据流。 我的配置。
我的来源回复是:
`[
{
"tables": [
{
"name": "PrimaryResult",
"columns": [
{
"name": "TimeGenerated",
"type": "datetime"
},
{
"name": "day_of_occurrence",
"type": "datetime"
},
{
"name": "request_id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "url",
"type": "string"
},
{
"name": "item_type",
"type": "string"
},
{
"name": "cloud_role_name",
"type": "string"
},
{
"name": "P995_duration",
"type": "real"
},
{
"name": "P99_duration",
"type": "real"
},
{
"name": "P95_duration",
"type": "real"
},
{
"name": "P90_duration",
"type": "real"
},
{
"name": "P75_duration",
"type": "real"
},
{
"name": "P50_duration",
"type": "real"
},
{
"name": "item_count",
"type": "int"
}
],
"rows": [
[
"2024-05-06T00:00:00Z",
"2024-05-06T00:00:00Z",
"",
"GET /api/version",
"https://api.vgdev.glint.cloud.dev.microsoft/api/version",
"",
"apiserver",
5.8315,
5.234724489795918,
3.174455248506027,
2.5453802347746817,
1.2622889021938888,
1.1403031329836708,
18762
]
]
}
]
}
]
`
但是 LOG ANALYTICS sestination 接受这种格式的 jsob 主体
`[
{
"TimeGenerated":"2024-05-06T03:59:02.9499434Z",
"day_of_occurrence":"2024-05-06T03:59:02.9499434Z",
"request_id":"dfasdsad",
"name":"gfhgfh",
"url":"gfhgf",
"item_type":"",
"cloud_role_name":"fvxccx",
"P995_duration":2,
"P99_duration":1,
"P95_duration":6,
"P90_duration":8,
"P75_duration":9.1,
"P50_duration":50,
"item_count":11
},
{
"TimeGenerated":"2024-05-06T03:59:02.9499434Z",
"day_of_occurrence":"2024-05-06T03:59:02.9499434Z",
"request_id":"xcxzzz",
"name":"ewqe",
"url":"bfvbvb",
"item_type":"",
"cloud_role_name":"gjghj",
"P995_duration":2,
"P99_duration":1,
"P95_duration":4,
"P90_duration":54,
"P75_duration":8.1,
"P50_duration":5,
"item_count":13
}
]`
如何从来源地转变为目的地?
我无法实现转变?
为了实现您的要求,您可以按照以下步骤进行操作。但请注意,以下方法仅适用于这种情况,并且您需要手动添加数据类型,因为在 Dataflow 中可能无法从输入 JSON 动态指定数据类型。
在数据流源中获取 JSON 后,进行 Flatten 转换并首先展平
tables
数组,然后仅从中选择必需的字段 rows
和 columns
。
在此之后,进行第二次 flatten 变换并展平
rows
数组。
现在,进行 driven 列转换并使用以下表达式创建一个
new
列。
replace(concat('{',replace(replace(replace(toString(mapIndex(rows,concat("'",columns[#index].name,"':",iif(in(['string','datetime'],columns[#index].type),concat("'",#item,"'"),#item)))),'[',''),']',''),'"',''),'}'),"'",'"')
这使用
rows
和 columns
数组并为每行生成所需的 JSON 对象字符串,如下所示。
在此之后使用 select 转换并仅选择
new
列。
现在,使用 Parse 转换从上面的 JSON 字符串中获取 JSON 对象。
在此,给出
new
列,您需要给出输出数据类型。
(TimeGenerated as string,
day_of_occurrence as string,
request_id as string,
name as string,
url as string,
item_type as string,
cloud_role_name as string,
P995_duration as double,
P99_duration as double,
P95_duration as double,
P90_duration as double,
P75_duration as double,
P50_duration as double,
item_count as integer)
您可以根据您的要求更改数据类型。在这里,对于示例,我给出了
double
数据类型,您也可以将其更改为 float
。
此后,将使用给定的数据类型创建所需的列。
要获取各个列,请使用另一个 select 转换和基于规则的映射。
这将生成所需的输出列。
您可以根据您的要求使用派生列转换从此处更改小数。将您的目标作为接收器,数据将被复制。
这是数据流JSON直到选择转换供您参考:
{
"name": "Logs_dataflow",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "logs_source_json",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [],
"transformations": [
{
"name": "flatten1"
},
{
"name": "flatten2"
},
{
"name": "derivedColumn1"
},
{
"name": "select1"
},
{
"name": "select2"
},
{
"name": "parse1"
}
],
"scriptLines": [
"source(output(",
" tables as (name as string, columns as (name as string, type as string)[], rows as string[][])[]",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false,",
" documentForm: 'arrayOfDocuments') ~> source1",
"source1 foldDown(unroll(tables, tables),",
" mapColumn(",
" columns = tables.columns,",
" rows = tables.rows",
" ),",
" skipDuplicateMapInputs: false,",
" skipDuplicateMapOutputs: false) ~> flatten1",
"flatten1 foldDown(unroll(rows, rows),",
" mapColumn(",
" rows,",
" columns",
" ),",
" skipDuplicateMapInputs: false,",
" skipDuplicateMapOutputs: false) ~> flatten2",
"flatten2 derive(new = replace(concat('{',replace(replace(replace(toString(mapIndex(rows,concat(\"'\",columns[#index].name,\"':\",iif(in(['string','datetime'],columns[#index].type),concat(\"'\",#item,\"'\"),#item)))),'[',''),']',''),'\"',''),'}'),\"'\",'\"')) ~> derivedColumn1",
"derivedColumn1 select(mapColumn(",
" new",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"parse1 select(mapColumn(",
" each(new,match(true()))",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select2",
"select1 parse(new = new ? (TimeGenerated as string,",
" day_of_occurrence as string,",
" request_id as string,",
" name as string,",
" url as string,",
" item_type as string,",
" cloud_role_name as string,",
" P995_duration as double,",
" P99_duration as double,",
" P95_duration as double,",
" P90_duration as double,",
" P75_duration as double,",
" P50_duration as double,",
" item_count as integer),",
" format: 'json',",
" documentForm: 'singleDocument') ~> parse1"
]
}
}
}