复杂的验证迭代过程来检查 ADF 中的记录数

问题描述 投票:0回答:1

Snowflake 迁移项目 - Teradata -> 提取文件 -> ADLS(BLOB) -> 通过 ADF 的 Snowflake。 提取的文件按批次放置在某个 XYZ 容器中(每个批次都有一定数量的文件,即 100 个)。前任。 Batch1 到 Batch10 文件夹,每个文件夹都有 50 个数据文件(每个文件中的标头和一些记录)和 50 个各自的控制文件(其中一行包含该特定数据文件的文件名记录计数)。 在 Batch1 文件夹中 -> 有 abc_date.csv.gz、abc_date.ctl、ijk_date.csv.gz、ijk_date.ctl 等文件。 现在我们需要进行预验证,如果数据文件与其各自的 ctl 文件之间的计数匹配,那么我们可以继续将数据加载到 Snowflake。 abc.csv.gz 有 1(标题)+100 条记录,abc.ctl 在一行中有 abc(文件名) 100 条日期。 如何在 ADF 中实现此验证?它必须迭代直到所有文件的第 10 批。

尝试使用获取元数据来获取文件列表。然后在每个内部,我都无法实现这个逻辑。有人可以帮忙吗

azure-data-factory
1个回答
0
投票

在 ADF 中,目前无法读取

.gz
文件的行数,也无法从
.ctl
文件获取数据,因为只有二进制数据集支持这两种文件。

要实现您的要求,您需要将

.gz
文件和
.ctl
文件转换为csv文件,然后您可以检查行数和文件数据。因此,首先转换 csv 文件并将其存储在 ADLS 中的临时容器中,稍后您可以将其删除。

要将子文件夹中的

.gz
转换为 csv 文件,请使用二进制数据集作为源和目标。

在源数据集中创建数据集参数。

enter image description here

提供数据集中的文件夹名称和文件名,如下所示,并选择

.gz
作为源中的压缩类型。

enter image description here

还在接收器数据集中,为文件名创建参数。这里,不要选择任何压缩类型。

enter image description here

当您将这些数据集提供给复制活动时,它会在此处询问参数值。

enter image description here

对于

.ctl
文件转换,您需要创建相同类型的数据集,但不要在源数据集中提供任何压缩类型。

您可以按照以下管道设计为上述参数提供值。

这里您需要两个管道(父管道和子管道),因为它涉及 Batch 文件夹及其文件的嵌套迭代。

遵循以下管道设计:

Parent pipeline
- Get meta data activity1 -> Get the child batch folder names list (Batch1,Batch2,..)

- For-Each1 -> Give Get meta data activity1 child items array

    - Get meta data activity2 -> Use dataset parameter for the folder name and give @item().name for that in the activity. Get all the child items files list in each Batch folder. 

    - Filter activity1 -> Give Get meta data activity2 child items array as Items and in the condition give @endswith(item().name,'.ctl')

    - Execute pipeline (Child pipeline) -> Pass the filter activity1 output array to this as an array parameter File_names and also pass @item().name (Current Batch folder name) as string parameter folder_name.

        Child pipeline
        - For-Each2 -> Give the array parameter to this.

            - Copy activity1 -> Use Binary dataset and create parameters for the folder name, file name and select .gz as Compression type. Give this to the copy actiivty source. Give pipeline parameter folder_name, @concat(split(item().name,'.')[0],'.csv.gz') in the pipeline to the dataset parameters respectively. Use a temporary location to copy this and that dataset should also be a binary dataset with a parameter for the filename. Give @concat(split(item().name,'.')[0],'.csv') to this parameter in the copy activity sink. This will generate a csv file <filename>.csv in the temporary location by which we can get the file count. Refer to the process that I have shared above.

            - Copy activity2 -> This is to convert the .ctl file into a csv file. Create two Binary datasets with same parameters like above but the compression type should be empty in the source Binary dataset. Give the folder_name parameter and @item().name to the folder and filename parameters respectively for the source Binary dataset. In the sink binary dataset, give @concat(split(item().name,'.')[0],'_ctl','.csv') to the filename parameter. This sink binary dataset should also be a temporary location.

            - Lookup1 -> Create a delimited txt dataset for the original csv file which is in temporary location. Create a dataset parameter for the file name and give the @concat(split(item().name,'.')[0],'.csv') as value for that in the lookup activity. Uncheck the firstRow in the lookup activity and this will give count of the file in its output. You can acess it usinh expression like @activity('Lookup1').output.count

            - Lookup2 -> Create another delimited txt dataset for the ctl csv file which is in same temporary location and create parameter for the filename as same like above. Give the @concat(split(item().name,'.')[0],'_ctl','.csv') as value for that in lookup and here select the FirstRow in it. This will give the first row value which contains the file row count in its output. You can access it using this expression  like @activity('Lookup2').output.firstRow.<column_name>.

            - If activity1 -> Check the two counts are same or not using above expressions.

            - True activities -> Here, proceed with your activities to copy the .gz file or .csv file in temporary location to your target.

            - False activities -> Optional but if you want you can add any activities when row counts are not same.

(或)您可以使用任何编程逻辑进行验证。您可以为此使用函数应用程序或数据块。从 ADF 调用这些进行验证,验证后,您可以使用 ADF 复制到您的目标。

© www.soinside.com 2019 - 2024. All rights reserved.