背景:我正在使用Zeppelin Notebook进行分析。目标是使用Pandas数据框而不是pySpark数据框。熊猫有一些我正在使用的统计功能。编写了.py文件以使用样本数据文件进行所有计算等测试。
用例:我的数据文件(CSV格式)具有模拟数据。通常,列数为45,000。目前,我有兴趣阅读前20,000列。行数通常为200。
print('1 : ' , datetime.now())
sparkDF=spark.read.csv(filePath + '/' + filename,header=True,maxColumns=50000)
print('Built sparkDF with Rows: ' , sparkDF.count() , ' Column Count: ' , len(sparkDF.columns))
print('2 : ' , datetime.now())
sparkDF1 = sparkDF.select(sparkDF.columns[0:20000])
print('Built sparkDF1 with Rows: ' , sparkDF1.count() , ' Column Count: ' , len(sparkDF1.columns))
print('3 : ' , datetime.now())
exposures = sparkDF1.toPandas()
print('Built exposures with Rows: ' , exposures.shape[0] , ' Column Count: ' , exposures.shape[1])
print('4 : ' , datetime.now())
1 : 2019-10-17 16:34:06.725326
Built sparkDF with Rows: 107 Column Count: 40002
2 : 2019-10-17 16:40:04.171931
Built sparkDF1 with Rows: 107 Column Count: 20001
3 : 2019-10-17 16:51:12.465739
Built exposures with Rows: 107 Column Count: 20001
4 : 2019-10-17 16:58:25.65423
问题:
建立数据帧的时间相当长。
我必须处理2900个此类文件。需要一些建议和替代方案。
我看到您正在从文件路径读取多个文件。为了优化此过程,您可以执行的操作是读取.csv
文件的整个目录,而不是一次读取一个文件(只要所有文件的架构都相同,看起来像您的情况):
file_path = "hdfs://path/to/multiple/files/"
df = (
spark.read.format("com.databricks.spark.csv")
.options(header="true", inferSchema="true")
.load(file_path)
)
pdf = df.toPandas()
这将在一定程度上优化您的I / O。即使同一文件夹中有10个文件或200个文件或2900个文件,与单独读取一个文件相比,读取速度也会更快。
注意:上述解决方案不适用于嵌套文件夹。