最有效的方法是并行加载spark中的许多文件?

问题描述 投票:2回答:1
[Disclaimer: While this question is somewhat specific, I think it circles a very generic issue with Hadoop/Spark.]

我需要在Spark中处理一个大型数据集(~14TB)。不进行聚合,主要是过滤。给出~30k文件(250个部分文件,每个月10年,每个部分大约200MB),我想将它们加载到RDD / DataFrame中,并根据一些任意过滤器过滤掉项目。

为了使文件列表高效(我在google dataproc / cloud存储,所以驱动程序执行通配符是非常串行且非常慢),我预先计算文件名的RDD,然后将它们加载到RDD(我正在使用avro,但文件类型不应该相关),例如

#returns an array of files to load
files = sc.textFile('/list/of/files/').collect()  

#load the files into a dataframe
documents = sqlContext.read.format('com.databricks.spark.avro').load(files)

当我这样做时,即使在50个工作集群上,似乎只有一个执行程序正在执行读取文件的工作。我已经尝试过广播文件列表并阅读了十几种不同的方法,但我似乎无法破解这个问题。

那么,是否有一种从多个文件创建一个非常大的数据帧的有效方法?在创建此RDD时,如何最好地利用所有潜在的计算能力?

这种方法在较小的集合上非常有效,但是在这个大小的情况下,我看到了大量的症状,比如没有反馈的长时间运行的进程。是否有一些宝库知识 - 除了@ zero323 :-) - 在这个规模上优化火花?

apache-spark pyspark google-cloud-dataproc
1个回答
0
投票

列出30k文件应该不是GCS的问题 - 即使一次列出多达500个文件的单个GCS列表请求每个需要1秒,所有30k文件将在一分钟左右列出。可能存在一些带有一些球形图案的极端情况使其变慢,但最近GCS连接器globbing implementation的优化可能会有所帮助。

这就是为什么它应该足够好,只需依靠默认的Spark API和globbing:

val df = sqlContext.read.avro("gs://<BUCKET>/path/to/files/")
© www.soinside.com 2019 - 2024. All rights reserved.