[Disclaimer: While this question is somewhat specific, I think it circles a very generic issue with Hadoop/Spark.]
我需要在Spark中处理一个大型数据集(~14TB)。不进行聚合,主要是过滤。给出~30k文件(250个部分文件,每个月10年,每个部分大约200MB),我想将它们加载到RDD / DataFrame中,并根据一些任意过滤器过滤掉项目。
为了使文件列表高效(我在google dataproc / cloud存储,所以驱动程序执行通配符是非常串行且非常慢),我预先计算文件名的RDD,然后将它们加载到RDD(我正在使用avro,但文件类型不应该相关),例如
#returns an array of files to load
files = sc.textFile('/list/of/files/').collect()
#load the files into a dataframe
documents = sqlContext.read.format('com.databricks.spark.avro').load(files)
当我这样做时,即使在50个工作集群上,似乎只有一个执行程序正在执行读取文件的工作。我已经尝试过广播文件列表并阅读了十几种不同的方法,但我似乎无法破解这个问题。
那么,是否有一种从多个文件创建一个非常大的数据帧的有效方法?在创建此RDD时,如何最好地利用所有潜在的计算能力?
这种方法在较小的集合上非常有效,但是在这个大小的情况下,我看到了大量的症状,比如没有反馈的长时间运行的进程。是否有一些宝库知识 - 除了@ zero323 :-) - 在这个规模上优化火花?
列出30k文件应该不是GCS的问题 - 即使一次列出多达500个文件的单个GCS列表请求每个需要1秒,所有30k文件将在一分钟左右列出。可能存在一些带有一些球形图案的极端情况使其变慢,但最近GCS连接器globbing implementation的优化可能会有所帮助。
这就是为什么它应该足够好,只需依靠默认的Spark API和globbing:
val df = sqlContext.read.avro("gs://<BUCKET>/path/to/files/")