如何使用删除活动通过 ADF 管道从 ADLS 中的多个子文件夹中删除旧文件夹?

问题描述 投票:0回答:2

我正在尝试从 ADLS 中删除超过 7 天的文件夹。我要删除的文件夹名称为日期,位于主文件夹的子文件夹下。请参阅下面的文件夹结构示例(raw 是我的容器)

/raw/MainFolder/SubfolderA/20230430/File.csv
/raw/MainFolder/SubfolderA/20230415/File.csv
/raw/MainFolder/SubfolderA/20230410/File.csv

/raw/MainFolder/SubfolderB/20230430/File.csv
/raw/MainFolder/SubfolderB/20230420/File.csv
/raw/MainFolder/SubfolderB/20230405/File.csv

我需要删除所有名称为日期且超过 7 天的文件夹。 在此示例中,管道应从 SubFolderA(20230415、20230410)中删除 2 个文件夹,从 SubFolderB(20230420、20230405)中删除 2 个文件夹。

如何在 Azure 数据工厂中创建一个管道,使用 GetMetadata 和 Delete Activity 动态删除这些旧文件夹?

我创建了一个带有 GetMetadata 活动的管道,并且能够在调试输出中看到 MainFolder 下的文件夹。但我需要帮助使其动态化,以便它能够获取子文件夹下的所有文件夹并创建删除活动。

azure azure-data-factory azure-data-lake azure-data-lake-gen2
2个回答
0
投票

您可以尝试使用以下方法:

  • Create a pipeline parameter say,

    Iteration
    with
    Int
    datatype and provide default value as -7 .

  • 使用 ForEach 活动并在项目中提供表达式:

    @createarray(0,1,2,3,4,5,6,7)

  • 在foreach里面,使用delete activity。在源设置中,将数据集指向

    raw/MainFolder/
    级别并使用通配符文件路径,表达式如下:
    @concat(addDays(utcNow(),add(pipeline().parameters.iteration,item()),'subFolder*/yyyyMMdd*'),'A.parquet')

你可以根据你的文件路径修改上面的表达式。


0
投票

我可以像下面那样达到你的要求。

这是我的文件夹结构:

raw
    MainFolder
        SubfolderA
            20230425
                //files
            20230427
                //files
            20230429
                //files
            20230523
                //files
        SubfolderB
            20230425
                //files
            20230427
                //files
            20230429
                //files
            20230523
                //files
            

因为你想删除超过 7 天的文件夹,首先我使用 ForEach 和

@range(0,7)
创建了一个日期数组。这个表达式给出数组
[0,1,2,3,4,5,6]
.

在 ForEach 内部,我使用将变量活动附加到数组以使用以下表达式以

yyyyMMdd
格式附加日期。

@formatDateTime(subtractFromTime(utcNow(),item(),'Day'),'yyyyMMdd')

这给出了最近 7 天列表的日期数组,如下所示。

enter image description here

这是我的流水线:

enter image description here

首先使用获取元数据活动获取子文件夹列表(

SubfolderA,SubfolderA
)并将此子项数组传递给 ForEach。

  • 在 ForEach 内部,使用另一个获取元数据活动(在路径中给出
    @item().name
    )来获取日期文件夹列表。
  • 现在,对这些子项目使用过滤器。在这里,我们通过检查我们的日期数组是否包含文件夹名称来过滤日期文件夹。
  • 从过滤器中获取超过7天的子项。这里我们需要遍历这个数组。但 ADF 目前不支持嵌套 Foreach。因此,通过传递当前子文件夹名称及其对应的子项数组来使用执行管道活动。
  • 在子管道中,遍历子项并对其使用删除活动。

使用具有如下参数的数据集:

enter image description here

我的父管道 JSON:

{
    "name": "parent",
    "properties": {
        "activities": [
            {
                "name": "get subfolders",
                "type": "GetMetadata",
                "dependsOn": [
                    {
                        "activity": "ForEach1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataset": {
                        "referenceName": "sourcecsv",
                        "type": "DatasetReference",
                        "parameters": {
                            "folderpath": "MainFolder"
                        }
                    },
                    "fieldList": [
                        "childItems"
                    ],
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "DelimitedTextReadSettings"
                    }
                }
            },
            {
                "name": "iterate subfolders",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "get subfolders",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('get subfolders').output.childItems",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "get date folders",
                            "type": "GetMetadata",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "sourcecsv",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "folderpath": {
                                            "value": "@concat('MainFolder/',item().name)",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "childItems"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "enablePartitionDiscovery": false
                                },
                                "formatSettings": {
                                    "type": "DelimitedTextReadSettings"
                                }
                            }
                        },
                        {
                            "name": "Execute Pipeline1",
                            "type": "ExecutePipeline",
                            "dependsOn": [
                                {
                                    "activity": "Filter1",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "pipeline": {
                                    "referenceName": "child",
                                    "type": "PipelineReference"
                                },
                                "waitOnCompletion": true,
                                "parameters": {
                                    "date_folder": {
                                        "value": "@activity('Filter1').output.value",
                                        "type": "Expression"
                                    },
                                    "path": {
                                        "value": "@concat('MainFolder/',item().name)",
                                        "type": "Expression"
                                    }
                                }
                            }
                        },
                        {
                            "name": "Filter1",
                            "type": "Filter",
                            "dependsOn": [
                                {
                                    "activity": "get date folders",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "items": {
                                    "value": "@activity('get date folders').output.childItems",
                                    "type": "Expression"
                                },
                                "condition": {
                                    "value": "@not(contains(variables('daysarr'),item().name))",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(0,7)",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Append variable1",
                            "type": "AppendVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "daysarr",
                                "value": {
                                    "value": "@formatDateTime(subtractFromTime(utcNow(),item(),'Day'),'yyyyMMdd')",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "variables": {
            "counter": {
                "type": "String"
            },
            "daysarr": {
                "type": "Array"
            },
            "temp": {
                "type": "String"
            },
            "new": {
                "type": "Array"
            }
        },
        "annotations": [],
        "lastPublishTime": "2023-05-02T07:27:09Z"
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

子管道 JSON:

{
    "name": "child",
    "properties": {
        "activities": [
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.date_folder",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Delete1",
                            "type": "Delete",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "sourcecsv",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "folderpath": {
                                            "value": "@concat(pipeline().parameters.path,'/',item().name)",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "enableLogging": false,
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "recursive": true,
                                    "enablePartitionDiscovery": false
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "date_folder": {
                "type": "array"
            },
            "path": {
                "type": "string"
            }
        },
        "annotations": []
    }
}

管道执行前的文件夹:

enter image description here

您可以看到超过 7 天的文件夹在管道执行后被删除。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.