ADF 管道将不同文件(CSV、EXCEL)加载到 Azure SQL 中

问题描述 投票:0回答:1

我对 Azure ADF 还很陌生。我给出了一项任务,需要构建一个管道,该管道采用不同的文件类型(如

.csv
和 Excel)并将其加载到 Azure SQL 中。

这些源和同步连接来自控制表。

任何帮助或想法将不胜感激。

我尝试使用元驱动复制任务..它仅适用于一种类型的文件系统..不适用于 Excel

azure-sql-database azure-data-factory
1个回答
0
投票

根据这个

文件格式类型无法在 ADF 中参数化。

因此,您无法通过元数据驱动的复制数据工具来复制多种类型的文件。

或者,如果您的文件系统中只有

.xlsx
.csv
文件格式,您可以按照以下步骤操作:

  1. 使用“获取元数据”活动获取目录中的文件列表。在数据集中选择所需的目录并创建

    Child items
    字段列表。它将获取目录中的所有文件,如下所示:

    Get Metadata Activity

  2. 使用已启用

    @activity('Get Metadata1').output.childItems
    sequential
    表达式将 ForEach 活动添加到元数据活动。使用
    @contains(item().name,'.xlsx')
    条件在 ForEach 内添加 If 活动。在真实情况下添加执行管道活动。添加管道,该管道将动态地将 Excel 文件中的每个工作表添加到 Azure SQL 数据库中。

    管道将具有以下活动来动态选择 Excel 工作表:

    • 添加三个名为

      c1
      的变量,默认值为0、
      tempc
      iserror
      ,如下所示:

      Variables in Pipeline

    • 将 Until 活动添加到管道,并使用变量

      tempc
      @string(add(int(variables('c1')),1))
      动态表达式设置变量活动。

    • 将复制活动添加到设置的变量中。选择包含

      index
      和动态值
      @int(variables('c1'))
      以及包含动态值
      fileName
      参数的
      @pipeline().parameters.fileName
      的 Excel 数据集作为源,选择包含
      tableName
      和动态值
      excel@{variables('c1')}
      参数的 SQL 数据集作为接收器。

    • 复制数据成功后,添加具有

      c1
      变量和动态值
      @variables('tempc')
      的“设置变量”活动。复制数据活动失败时,添加带有
      iserror
      和动态值
      @bool(1)
      的设置变量活动。将
      @variables('iserror')
      添加到 Until 活动。

    这是动态 Excel 工作表副本的管道 JSON:

    {
     "name": "excel",
     "properties": {
         "activities": [
             {
                 "name": "Until1",
                 "type": "Until",
                 "dependsOn": [],
                 "userProperties": [],
                 "typeProperties": {
                     "expression": {
                         "value": "@variables('iserror')",
                         "type": "Expression"
                     },
                     "activities": [
                         {
                             "name": "tempc",
                             "type": "SetVariable",
                             "dependsOn": [],
                             "policy": {
                                 "secureOutput": false,
                                 "secureInput": false
                             },
                             "userProperties": [],
                             "typeProperties": {
                                 "variableName": "tempc",
                                 "value": {
                                     "value": "@string(add(int(variables('c1')),1))",
                                     "type": "Expression"
                                 }
                             }
                         },
                         {
                             "name": "Copy data1",
                             "type": "Copy",
                             "dependsOn": [
                                 {
                                     "activity": "tempc",
                                     "dependencyConditions": [
                                         "Succeeded"
                                     ]
                                 }
                             ],
                             "policy": {
                                 "timeout": "0.12:00:00",
                                 "retry": 0,
                                 "retryIntervalInSeconds": 30,
                                 "secureOutput": false,
                                 "secureInput": false
                             },
                             "userProperties": [],
                             "typeProperties": {
                                 "source": {
                                     "type": "ExcelSource",
                                     "storeSettings": {
                                         "type": "AzureBlobFSReadSettings",
                                         "recursive": true,
                                         "enablePartitionDiscovery": false
                                     }
                                 },
                                 "sink": {
                                     "type": "AzureSqlSink",
                                     "writeBehavior": "insert",
                                     "sqlWriterUseTableLock": false,
                                     "tableOption": "autoCreate",
                                     "disableMetricsCollection": false
                                 },
                                 "enableStaging": false,
                                 "translator": {
                                     "type": "TabularTranslator",
                                     "typeConversion": true,
                                     "typeConversionSettings": {
                                         "allowDataTruncation": true,
                                         "treatBooleanAsNumber": false
                                     }
                                 }
                             },
                             "inputs": [
                                 {
                                     "referenceName": "Excel1",
                                     "type": "DatasetReference",
                                     "parameters": {
                                         "index": {
                                             "value": "@int(variables('c1'))",
                                             "type": "Expression"
                                         },
                                         "fileName": {
                                             "value": "@pipeline().parameters.fileName",
                                             "type": "Expression"
                                         }
                                     }
                                 }
                             ],
                             "outputs": [
                                 {
                                     "referenceName": "AzureSqlTable1",
                                     "type": "DatasetReference",
                                     "parameters": {
                                         "tableName": {
                                             "value": "excel@{variables('c1')}",
                                             "type": "Expression"
                                         }
                                     }
                                 }
                             ]
                         },
                         {
                             "name": "c1",
                             "type": "SetVariable",
                             "dependsOn": [
                                 {
                                     "activity": "Copy data1",
                                     "dependencyConditions": [
                                         "Succeeded"
                                     ]
                                 }
                             ],
                             "policy": {
                                 "secureOutput": false,
                                 "secureInput": false
                             },
                             "userProperties": [],
                             "typeProperties": {
                                 "variableName": "c1",
                                 "value": {
                                     "value": "@variables('tempc')",
                                     "type": "Expression"
                                 }
                             }
                         },
                         {
                             "name": "iserror",
                             "type": "SetVariable",
                             "dependsOn": [
                                 {
                                     "activity": "Copy data1",
                                     "dependencyConditions": [
                                         "Failed"
                                     ]
                                 }
                             ],
                             "policy": {
                                 "secureOutput": false,
                                 "secureInput": false
                             },
                             "userProperties": [],
                             "typeProperties": {
                                 "variableName": "iserror",
                                 "value": {
                                     "value": "@bool(1)",
                                     "type": "Expression"
                                 }
                             }
                         }
                     ],
                     "timeout": "0.12:00:00"
                 }
             }
         ],
         "parameters": {
             "fileName": {
                 "type": "string"
             }
         },
         "variables": {
             "c1": {
                 "type": "String",
                 "defaultValue": "0"
             },
             "iserror": {
                 "type": "Boolean"
             },
             "tempc": {
                 "type": "String"
             }
         },
         "annotations": [],
         "lastPublishTime": "2024-02-23T08:00:17Z"
     },
     "type": "Microsoft.DataFactory/factories/pipelines"
     }
    
  3. @item().name
    表达式添加到
    fileName
    参数。

  4. 在错误情况下添加复制数据活动。选择以参数

    fileName
    和动态值
    @item().name
    作为源的分隔文本数据集,以及以
    tableName
    参数和动态值
    @replace(item().name,'.csv','')
    作为接收器的 SQL DB 数据集。调试管道;它将成功地将所有 CSV 文件和 Excel 工作表复制到 SQL 数据库中,如下所示:

    Debug Pipeline

  5. 运行以下脚本以显示复制的表:

    SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES
    WHERE TABLE_TYPE =  'BASE TABLE'  and TABLE_SCHEMA =  'dbo'
    

    脚本将显示如下所示的表格:

    表_架构 表名称
    dbo excel0
    dbo excel11
    dbo 输入
    dbo 输入1

    这是管道 JSON:

    {
         "name": "pipeline1",
         "properties": {
             "activities": [
                 {
                     "name": "Get Metadata1",
                     "type": "GetMetadata",
                     "dependsOn": [],
                     "policy": {
                         "timeout": "0.12:00:00",
                         "retry": 0,
                         "retryIntervalInSeconds": 30,
                         "secureOutput": false,
                         "secureInput": false
                     },
                     "userProperties": [],
                     "typeProperties": {
                         "dataset": {
                             "referenceName": "DelimitedText1",
                             "type": "DatasetReference"
                         },
                         "fieldList": [
                             "childItems"
                         ],
                         "storeSettings": {
                             "type": "AzureBlobFSReadSettings",
                             "recursive": true,
                             "enablePartitionDiscovery": false
                         },
                         "formatSettings": {
                             "type": "DelimitedTextReadSettings"
                         }
                     }
                 },
                 {
                     "name": "ForEach1",
                     "type": "ForEach",
                     "dependsOn": [
                         {
                             "activity": "Get Metadata1",
                             "dependencyConditions": [
                                 "Succeeded"
                             ]
                         }
                     ],
                     "userProperties": [],
                     "typeProperties": {
                         "items": {
                             "value": "@activity('Get Metadata1').output.childItems",
                             "type": "Expression"
                         },
                         "isSequential": true,
                         "activities": [
                             {
                                 "name": "If Condition1",
                                 "type": "IfCondition",
                                 "dependsOn": [],
                                 "userProperties": [],
                                 "typeProperties": {
                                     "expression": {
                                         "value": "@contains(item().name,'.xlsx')",
                                         "type": "Expression"
                                     },
                                     "ifFalseActivities": [
                                         {
                                             "name": "Copy data1",
                                             "type": "Copy",
                                             "dependsOn": [],
                                             "policy": {
                                                 "timeout": "0.12:00:00",
                                                 "retry": 0,
                                                 "retryIntervalInSeconds": 30,
                                                 "secureOutput": false,
                                                 "secureInput": false
                                             },
                                             "userProperties": [],
                                             "typeProperties": {
                                                 "source": {
                                                     "type": "DelimitedTextSource",
                                                     "storeSettings": {
                                                         "type": "AzureBlobFSReadSettings",
                                                         "recursive": true,
                                                         "enablePartitionDiscovery": false
                                                     },
                                                     "formatSettings": {
                                                         "type": "DelimitedTextReadSettings"
                                                     }
                                                 },
                                                 "sink": {
                                                     "type": "AzureSqlSink",
                                                     "writeBehavior": "insert",
                                                     "sqlWriterUseTableLock": false,
                                                     "tableOption": "autoCreate",
                                                     "disableMetricsCollection": false
                                                 },
                                                 "enableStaging": false,
                                                 "translator": {
                                                     "type": "TabularTranslator",
                                                     "typeConversion": true,
                                                     "typeConversionSettings": {
                                                         "allowDataTruncation": true,
                                                         "treatBooleanAsNumber": false
                                                     }
                                                 }
                                             },
                                             "inputs": [
                                                 {
                                                     "referenceName": "source",
                                                     "type": "DatasetReference",
                                                     "parameters": {
                                                         "fileName": {
                                                             "value": "@item().name",
                                                             "type": "Expression"
                                                         }
                                                     }
                                                 }
                                             ],
                                             "outputs": [
                                                 {
                                                     "referenceName": "AzureSqlTable2",
                                                     "type": "DatasetReference",
                                                     "parameters": {
                                                         "tableName": {
                                                             "value": "@replace(item().name,'.csv','')",
                                                             "type": "Expression"
                                                         }
                                                     }
                                                 }
                                             ]
                                         }
                                     ],
                                     "ifTrueActivities": [
                                         {
                                             "name": "Execute Pipeline1",
                                             "type": "ExecutePipeline",
                                             "dependsOn": [],
                                             "policy": {
                                                 "secureInput": false
                                             },
                                             "userProperties": [],
                                             "typeProperties": {
                                                 "pipeline": {
                                                     "referenceName": "excel",
                                                     "type": "PipelineReference"
                                                 },
                                                 "waitOnCompletion": true,
                                                 "parameters": {
                                                     "fileName": {
                                                         "value": "@item().name",
                                                         "type": "Expression"
                                                     }
                                                 }
                                             }
                                         }
                                     ]
                                 }
                             }
                         ]
                     }
                 }
             ],
             "annotations": [],
             "lastPublishTime": "2024-02-23T08:03:18Z"
         },
         "type": "Microsoft.DataFactory/factories/pipelines"
     }
    
© www.soinside.com 2019 - 2024. All rights reserved.