在 Azure 数据工厂中动态映射架构

问题描述 投票:0回答:1

在 blob 存储中,我有多个没有列名称的 csv 文件,使用 get 元活动我获取所有这些文件,并且我想使用循环中的复制数据活动将这些文件上传到 Azure SQL 中的相应表。 (每个表都有不同的架构)。下面是 2 个示例文件。 我尝试过使用单个文件。我想循环处理多个文件。

**sample1.txt**
5|1|300|100| |101|809|4|4|4|0|0|0|0|0|0|0|0|2|0|0|0|0|-4|0|0||2020-07-06
5|1|300|100| |102|809|6|5|5|0|0|0|0|0|0|0|0|2|0|0|0|0|-5|0|0||2020-07-14
5|1|300|100| |103|809|-1|-1|-1|0|0|0|0|0|0|0|0|2|0|0|0|0|1|0|0||2020-07-05
5|1|300|100| |104|809|7|7|7|0|0|0|0|0|0|0|0|2|0|0|0|0|-7|0|0||2020-07-05
5|1|300|100| |105|809|-5|-5|-5|0|0|0|0|0|0|0|0|2|0|0|0|0|5|0|0||2021-08-18
5|1|300|100| |106|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05
5|1|300|100| |107|809|8|8|8|0|0|0|0|0|0|0|0|2|0|0|0|0|-8|0|0||2020-07-14
5|1|300|100| |108|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-08
5|1|300|100| |109|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-14
5|1|300|100| |111|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-07
5|1|300|100| |112|809|4|4|4|0|0|0|0|0|0|0|0|2|0|0|0|0|-4|0|0||2020-07-05
5|1|300|100| |114|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-08

**sample2.txt**
5|1|300|100| |131|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-06
5|1|300|100| |132|809|7|7|7|0|0|0|0|0|0|0|0|2|0|0|0|0|-7|0|0||2020-07-08
5|1|300|100| |135|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05
5|1|300|100| |136|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-08
5|1|300|100| |138|809|5|5|5|0|0|0|0|0|0|0|0|2|0|0|0|0|-5|0|0||2020-07-08
5|1|300|100| |139|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-07
5|1|300|100| |140|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-06
5|1|300|100| |142|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-08
5|1|300|100| |143|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-14
5|1|300|100| |146|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05```
sql azure-sql-database azure-data-factory
1个回答
0
投票

对于单个无头源文件,您可以手动进行映射。但对于多个文件,映射需要动态完成,而不考虑标题。因此,使用复制活动可能会很难每次动态构建模式。相反,使用数据流,它将架构动态映射到目标表

如果所有目标 SQL 表都已创建,那么您需要同时循环表名数组和获取元数据文件列表数组。

首先获取表名数组的正确顺序,该顺序应与文件名数组的顺序相同。现在,您需要在同一个 for 循环中迭代两个数组。通过这个SO答案了解如何同时循环2个相同长度的数组。

或者如果您的目标表名称与文件名相同(

filename.txt
),则无需迭代另一个数组。您可以从文件名本身中提取表名。我正在遵循这种方法。

首先将“获取元数据”活动子项数组赋予“对于每个活动”项。

为源csv文件和目标SQL表创建数据集。要迭代文件和表,请使用数据集参数。创建字符串类型参数并在文件名和表名动态内容中使用它们,如下所示。

源 csv 数据集,文件名带有数据集参数:

enter image description here

目标SQL表数据集,数据集参数为表名:

enter image description here

在这里,将过程分为两部分。一种是处理数据复制到已有的表,另一种是通过创建表将数据复制到新表。

为此,首先在 ForEach 内的查找活动中使用以下查询来检查当前表是否已存在。

IF OBJECT_ID (N'@{split(item().name,'.')[0]}', N'U') IS NOT NULL 
   SELECT 1 AS res ELSE SELECT 0 AS res;

enter image description here

在 if 活动条件中使用以下表达式。

@equals(activity('Lookup1').output.firstRow.res, 1)

现在,使用 2 个数据流。一个用于已存在的表,另一个用于新表。

对于现有数据流的数据流,使用联合转换添加 csv 数据的标题,如下所示。首先获取两个源,目标表数据集和源 csv 数据集。 要仅选择目标表中的列,请在源查询动态表达式中给出以下表达式。创建字符串类型

table_name
的数据流参数并在表达式中使用它。

"select * from {$table_name} where 'Rakesh'='Laddu'"

enter image description here

按位置进行并集,并将与接收器相同的目标表数据集添加到数据流中。

enter image description here

在管道中,在 if Activity True 活动中获取数据流活动并传递数据集参数,如下所示。

enter image description here

此外,在同一数据流活动中为数据流参数

@split(item().name,'.')[0]
传递相同的表达式
table_name

对于新表,采用另一个数据流,将 csv 数据集作为源,将 SQL 目标表数据集作为接收器。在接收器设置中检查重新创建表选项。

enter image description here

在 if 的 False 活动中添加此数据流,并传递与上面相同的数据集参数值。

现在,执行管道。对于现有表,将插入数据,对于不存在的表,将使用 ADF 中的默认列名称创建相应的表,如下所示。

enter image description here

现有表的数据流 JSON:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                },
                {
                    "dataset": {
                        "referenceName": "source_csvs",
                        "type": "DatasetReference"
                    },
                    "name": "source2"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "union1"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     table_name as string",
                "}",
                "source(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     isolationLevel: 'READ_UNCOMMITTED',",
                "     query: (\"select * from {$table_name} where 'Rakesh'='Laddu'\"),",
                "     format: 'query') ~> source1",
                "source(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source2",
                "source1, source2 union(byName: false)~> union1",
                "union1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> sink1"
            ]
        }
    }
}

新表的数据流 JSON:

{
    "name": "dataflow2",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "source_csvs",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [],
            "scriptLines": [
                "source(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source1",
                "source1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     recreate:true,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> sink1"
            ]
        }
    }
}

此管道 JSON:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Get Metadata1",
                "type": "GetMetadata",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataset": {
                        "referenceName": "Get_csv_list",
                        "type": "DatasetReference"
                    },
                    "fieldList": [
                        "childItems"
                    ],
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "DelimitedTextReadSettings"
                    }
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Get Metadata1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Get Metadata1').output.childItems",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Lookup1",
                            "type": "Lookup",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "source": {
                                    "type": "AzureSqlSource",
                                    "sqlReaderQuery": {
                                        "value": "IF OBJECT_ID (N'@{split(item().name,'.')[0]}', N'U') IS NOT NULL \n   SELECT 1 AS res ELSE SELECT 0 AS res;",
                                        "type": "Expression"
                                    },
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None"
                                },
                                "dataset": {
                                    "referenceName": "AzureSqlTable1",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "table_name": "''"
                                    }
                                },
                                "firstRowOnly": true
                            }
                        },
                        {
                            "name": "If Condition1",
                            "type": "IfCondition",
                            "dependsOn": [
                                {
                                    "activity": "Lookup1",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "expression": {
                                    "value": "@equals(activity('Lookup1').output.firstRow.res, 1)",
                                    "type": "Expression"
                                },
                                "ifFalseActivities": [
                                    {
                                        "name": "Data flow2",
                                        "type": "ExecuteDataFlow",
                                        "dependsOn": [],
                                        "policy": {
                                            "timeout": "0.12:00:00",
                                            "retry": 0,
                                            "retryIntervalInSeconds": 30,
                                            "secureOutput": false,
                                            "secureInput": false
                                        },
                                        "userProperties": [],
                                        "typeProperties": {
                                            "dataflow": {
                                                "referenceName": "dataflow2",
                                                "type": "DataFlowReference",
                                                "datasetParameters": {
                                                    "source1": {
                                                        "filename": {
                                                            "value": "@item().name",
                                                            "type": "Expression"
                                                        }
                                                    },
                                                    "sink1": {
                                                        "table_name": {
                                                            "value": "@split(item().name,'.')[0]",
                                                            "type": "Expression"
                                                        }
                                                    }
                                                }
                                            },
                                            "compute": {
                                                "coreCount": 8,
                                                "computeType": "General"
                                            },
                                            "traceLevel": "Fine"
                                        }
                                    }
                                ],
                                "ifTrueActivities": [
                                    {
                                        "name": "Data flow1",
                                        "type": "ExecuteDataFlow",
                                        "dependsOn": [],
                                        "policy": {
                                            "timeout": "0.12:00:00",
                                            "retry": 0,
                                            "retryIntervalInSeconds": 30,
                                            "secureOutput": false,
                                            "secureInput": false
                                        },
                                        "userProperties": [],
                                        "typeProperties": {
                                            "dataflow": {
                                                "referenceName": "dataflow1",
                                                "type": "DataFlowReference",
                                                "parameters": {
                                                    "table_name": {
                                                        "value": "'@{split(item().name,'.')[0]}'",
                                                        "type": "Expression"
                                                    }
                                                },
                                                "datasetParameters": {
                                                    "source1": {
                                                        "table_name": {
                                                            "value": "@split(item().name,'.')[0]",
                                                            "type": "Expression"
                                                        }
                                                    },
                                                    "source2": {
                                                        "filename": {
                                                            "value": "@item().name",
                                                            "type": "Expression"
                                                        }
                                                    },
                                                    "sink1": {
                                                        "table_name": {
                                                            "value": "@split(item().name,'.')[0]",
                                                            "type": "Expression"
                                                        }
                                                    }
                                                }
                                            },
                                            "compute": {
                                                "coreCount": 8,
                                                "computeType": "General"
                                            },
                                            "traceLevel": "Fine"
                                        }
                                    }
                                ]
                            }
                        }
                    ]
                }
            }
        ],
        "annotations": []
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.