在ADLS2中合并通过DataBrick准备的CSV文件

问题描述 投票:0回答:1

[在运行DataBricks代码并准备CSV文件并将其加载到ADLS2时,CSV文件被拆分为许多CSV文件并正在加载到ADLS2。

是否可以通过pyspark在ADLS2中合并这些CSV文件。

谢谢

python pyspark databricks azure-data-lake
1个回答
0
投票

是否可以通过pyspark在ADLS2中合并这些CSV文件。

据我所知,spark数据帧确实将文件分开制作。理论上,您可以使用spark.csv method来接受字符串列表作为参数。

enter image description here

>>> df = spark.read.csv('path')

然后使用df.toPandas().to_csv()方法将对象写入pandas数据帧。在这种情况下,您可以参考一些线索:Azure Data-bricks : How to read part files and save it as one file to blob?

但是,恐怕此过程无法容纳如此高的内存消耗。因此,建议您只使用os包直接进行合并。我测试了以下2个代码段,供您参考。

1st:

import os

path = '/dbfs/mnt/test/'
file_suffix = '.csv'
filtered_files = [file for file in files if file.endswith(file_suffix)]
print(filtered_files)

with open(path + 'final.csv', 'w') as final_file:
    for file in filtered_files:
        with open(file) as f:
            lines = f.readlines()
            final_file.writelines(lines[1:])

[第二:

import os

path = '/dbfs/mnt/test/'
file_suffix = '.csv'

filtered_files = [os.path.join(root, name) for root, dirs, files in os.walk(top=path , topdown=False) for name in files if name.endswith(file_suffix)]
print(filtered_files)

with open(path + 'final2.csv', 'w') as final_file:
    for file in filtered_files:
        with open(file) as f:
            lines = f.readlines()
            final_file.writelines(lines[1:])

第二个是兼容的层次结构。


另外,我在这里提供一种使用ADF复制活动将多个csv文件传输到ADLS gen2中的一个文件的方法。

请参考此doc,并在ADLS gen2源数据集中配置文件夹路径。然后,将[MergeFiles]设置为copyBehavior属性。(此外,您可以使用wildFileName之类的*.csv来排除您不想删除的文件触摸特定的文件夹)

将源文件夹中的所有文件合并到一个文件中。如果文件名如果指定,则合并的文件名是指定的名称。除此以外,这是一个自动生成的文件名。

© www.soinside.com 2019 - 2024. All rights reserved.