我对 Azure ADF 还很陌生。我给出了一项任务,需要构建一个管道,该管道采用不同的文件类型(如
.csv
和 Excel)并将其加载到 Azure SQL 中。
这些源和同步连接来自控制表。
任何帮助或想法将不胜感激。
我尝试使用元驱动复制任务..它仅适用于一种类型的文件系统..不适用于 Excel
根据这个,
文件格式类型无法在 ADF 中参数化。
因此,您无法通过元数据驱动的复制数据工具来复制多种类型的文件。
或者,如果您的文件系统中只有
.xlsx
和 .csv
文件格式,您可以按照以下步骤操作:
使用“获取元数据”活动获取目录中的文件列表。在数据集中选择所需的目录并创建
Child items
字段列表。它将获取目录中的所有文件,如下所示:
使用已启用
@activity('Get Metadata1').output.childItems
的 sequential
表达式将 ForEach 活动添加到元数据活动。使用 @contains(item().name,'.xlsx')
条件在 ForEach 内添加 If 活动。在真实情况下添加执行管道活动。添加管道,该管道将动态地将 Excel 文件中的每个工作表添加到 Azure SQL 数据库中。
管道将具有以下活动来动态选择 Excel 工作表:
添加三个名为
c1
的变量,默认值为0、tempc
和iserror
,如下所示:
将 Until 活动添加到管道,并使用变量
tempc
和 @string(add(int(variables('c1')),1))
动态表达式设置变量活动。
将复制活动添加到设置的变量中。选择包含
index
和动态值 @int(variables('c1'))
以及包含动态值 fileName
参数的 @pipeline().parameters.fileName
的 Excel 数据集作为源,选择包含 tableName
和动态值 excel@{variables('c1')}
参数的 SQL 数据集作为接收器。
复制数据成功后,添加具有
c1
变量和动态值 @variables('tempc')
的“设置变量”活动。复制数据活动失败时,添加带有 iserror
和动态值 @bool(1)
的设置变量活动。将 @variables('iserror')
添加到 Until 活动。
这是动态 Excel 工作表副本的管道 JSON:
{
"name": "excel",
"properties": {
"activities": [
{
"name": "Until1",
"type": "Until",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@variables('iserror')",
"type": "Expression"
},
"activities": [
{
"name": "tempc",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "tempc",
"value": {
"value": "@string(add(int(variables('c1')),1))",
"type": "Expression"
}
}
},
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [
{
"activity": "tempc",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "ExcelSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
}
},
"sink": {
"type": "AzureSqlSink",
"writeBehavior": "insert",
"sqlWriterUseTableLock": false,
"tableOption": "autoCreate",
"disableMetricsCollection": false
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "Excel1",
"type": "DatasetReference",
"parameters": {
"index": {
"value": "@int(variables('c1'))",
"type": "Expression"
},
"fileName": {
"value": "@pipeline().parameters.fileName",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "AzureSqlTable1",
"type": "DatasetReference",
"parameters": {
"tableName": {
"value": "excel@{variables('c1')}",
"type": "Expression"
}
}
}
]
},
{
"name": "c1",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Copy data1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "c1",
"value": {
"value": "@variables('tempc')",
"type": "Expression"
}
}
},
{
"name": "iserror",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Copy data1",
"dependencyConditions": [
"Failed"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "iserror",
"value": {
"value": "@bool(1)",
"type": "Expression"
}
}
}
],
"timeout": "0.12:00:00"
}
}
],
"parameters": {
"fileName": {
"type": "string"
}
},
"variables": {
"c1": {
"type": "String",
"defaultValue": "0"
},
"iserror": {
"type": "Boolean"
},
"tempc": {
"type": "String"
}
},
"annotations": [],
"lastPublishTime": "2024-02-23T08:00:17Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
将
@item().name
表达式添加到 fileName
参数。
在错误情况下添加复制数据活动。选择以参数
fileName
和动态值 @item().name
作为源的分隔文本数据集,以及以 tableName
参数和动态值 @replace(item().name,'.csv','')
作为接收器的 SQL DB 数据集。调试管道;它将成功地将所有 CSV 文件和 Excel 工作表复制到 SQL 数据库中,如下所示:
运行以下脚本以显示复制的表:
SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'dbo'
脚本将显示如下所示的表格:
表_架构 | 表名称 |
---|---|
dbo | excel0 |
dbo | excel11 |
dbo | 输入 |
dbo | 输入1 |
这是管道 JSON:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Get Metadata1",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Get Metadata1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Metadata1').output.childItems",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@contains(item().name,'.xlsx')",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "AzureSqlSink",
"writeBehavior": "insert",
"sqlWriterUseTableLock": false,
"tableOption": "autoCreate",
"disableMetricsCollection": false
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "source",
"type": "DatasetReference",
"parameters": {
"fileName": {
"value": "@item().name",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "AzureSqlTable2",
"type": "DatasetReference",
"parameters": {
"tableName": {
"value": "@replace(item().name,'.csv','')",
"type": "Expression"
}
}
}
]
}
],
"ifTrueActivities": [
{
"name": "Execute Pipeline1",
"type": "ExecutePipeline",
"dependsOn": [],
"policy": {
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"pipeline": {
"referenceName": "excel",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"fileName": {
"value": "@item().name",
"type": "Expression"
}
}
}
}
]
}
}
]
}
}
],
"annotations": [],
"lastPublishTime": "2024-02-23T08:03:18Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}