我正在使用HDFS在集群上使用Apache Spark。据我了解,HDFS正在数据节点上分发文件。因此,如果在文件系统上放置“file.txt”,它将被拆分为分区。现在我在打电话
rdd = SparkContext().textFile("hdfs://.../file.txt")
来自Apache Spark。 rdd现在自动与文件系统上的“file.txt”分区相同吗?我打电话时会发生什么
rdd.repartition(x)
其中x>那么hdfs使用的分区? Spark会在物理上重新排列hdfs上的数据以在本地工作吗?
示例:我在HDFS系统上放置了一个30GB的文本文件,该文件将其分发到10个节点上。 Will Spark a)使用相同的10个分区吗?和b)当我调用重新分区(1000)时,在群集中洗牌30GB?
当Spark从HDFS读取文件时,它会为单个输入拆分创建单个分区。输入拆分由用于读取此文件的Hadoop InputFormat
设置。例如,如果你使用textFile()
,它将是Hadoop中的TextInputFormat
,它将为单个HDFS块返回单个分区(但是分区之间的分割将在线分割,而不是精确的块分割),除非你有压缩文本文件。如果是压缩文件,您将获得单个文件的单个分区(因为压缩文本文件不可拆分)。
当你调用rdd.repartition(x)
时,它会执行从N
中的rdd
partititons到你想要的x
分区的数据混乱,分区将在循环基础上完成。
如果你有一个30GB的未压缩文本文件存储在HDFS上,那么使用默认的HDFS块大小设置(128MB)它将存储在235个块中,这意味着你从这个文件中读取的RDD将有235个分区。当您调用repartition(1000)
时,您的RDD将被标记为重新分区,但实际上只有当您将在此RDD之上执行操作时,它才会被拖拽到1000个分区(延迟执行概念)
以下是“如何将HDFS中的块作为分区加载到Spark worker中”的快照
在此图像中,4个HDFS块作为Spark分区加载到3个工作器内存中
示例:我在HDFS系统上放置了一个30GB的文本文件,它将它分发到10个节点上。
威尔·斯帕克
a)使用相同的10个分区?
Spark将相同的10个HDFS块作为分区加载到工作者内存中。我假设30 GB文件的块大小应为3 GB,以获得10个分区/块(默认配置)
b)当我呼叫重新分区(1000)时,在群集中洗牌30GB?
是的,Spark在工作节点之间对数据进行洗牌,以便在工作器内存中创建1000个分区。
注意:
HDFS Block -> Spark partition : One block can represent as One partition (by default)
Spark partition -> Workers : Many/One partitions can present in One workers
当使用spark-sql读取非分段的HDFS文件(例如镶木地板)时,DataFrame分区df.rdd.getNumPartitions
的数量取决于以下因素:
spark.default.parallelism
(大致翻译为应用程序可用的#cores)spark.sql.files.maxPartitionBytes
(默认128MB)spark.sql.files.openCostInBytes
(默认4MB)对分区数量的粗略估计是:
AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores)
NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
AveragePartitionSize ≈ 128MB
NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
精确的计算稍微复杂一些,可以在FileSourceScanExec的代码库中找到,请参考here。
除了@ 0x0FFF如果它从HDFS作为输入文件,它将计算像这个rdd = SparkContext().textFile("hdfs://.../file.txt")
和当你做rdd.getNumPatitions
它将导致Max(2, Number of HDFS block)
。我做了很多实验并发现了这个结果。再次明确你可以做rdd = SparkContext().textFile("hdfs://.../file.txt", 400)
得到400作为分区,甚至可以通过rdd.repartition
重新分区或通过rdd.coalesce(10)
减少到10