将spark数据拆分为分区,然后将这些分区并行写入磁盘中

问题描述 投票:0回答:1

问题概述:假设我在AWS的EMR集群上通过Spark处理了300+ GB的数据。此数据具有三个属性,这些属性用于在Hive中使用的文件系统上进行分区:日期,小时和(假设)anotherAttr。我想将此数据写入fs,以最大程度减少写入的文件数。

我现在正在做的是获取日期,小时,anotherAttr和构成组合的行数的不同组合。我将它们收集到驱动程序的列表中,并遍历该列表,为每个组合构建一个新的DataFrame,使用行数对该DataFrame进行重新分区以使文件大小符合要求,并使用DataFrameWriter将文件写入磁盘,.orc完成它关闭。

出于组织原因,我们不使用Parquet。

此方法相当有效,并且解决了以下问题:使用Hive而不是Spark的下游团队看不到大量文件导致的性能问题。例如,如果我使用整个300 GB的DataFrame,对1000个分区(在spark中)和相关的列进行重新分区,然后将其转储到磁盘,则它们将全部并行转储,并在大约9分钟内完成整个操作。但是,对于较大的分区,最多可以容纳1000个文件,这会破坏Hive的性能。否则它会破坏某种性能,说实话也不是100%确定。我只是被要求保持尽可能低的文件数。使用我正在使用的方法,我可以将文件保持为所需的大小(无论如何都相对关闭),但是没有并行性,运行大约需要45分钟,主要是等待文件写入。

在我看来,由于某些源行和某些目标行之间存在一对一的关系,并且由于我可以将数据组织成不重叠的“文件夹”(Hive的分区),所以我应该能够以这样的方式组织我的代码/ DataFrame:我可以要求spark并行写入所有目标文件。有人对如何攻击这个有建议吗?

我测试过的东西不起作用:

  1. 使用scala并行集合开始写操作。无论使用DataFrame进行什么操作,它都无法很好地分离任务,并且某些计算机遇到大量垃圾收集问题。

  2. DataFrame.map-我试图在唯一组合的DataFrame上进行映射,并从内部进行启动写入,但是无法从map内部访问我实际需要的数据的DataFrame-DataFrame引用在执行程序上为null。

  3. DataFrame.mapPartitions-一个初学者,无法从mapPartitions内部提出我想要做的任何想法。]

  4. 'partition'一词在这里也不是特别有用,因为它既涉及按某些标准火花分割数据的概念,也涉及将数据组织在Hive的磁盘上的方式。我认为上面的用法很清楚。因此,如果我想一个完美的解决方案,那就是我可以基于三个属性创建一个具有1000个分区的DataFrame,以进行快速查询,然后从中创建另一个DataFrames集合,每个DataFrame都具有一个唯一的唯一组合这些属性将重新分区(在Spark中,但对于Hive),并根据其包含的数据大小分配了适当的分区数。大多数DataFrame都具有1个分区,少数具有最多10个分区。文件应为〜3 GB,并且我们的EMR群集具有比每个执行者更多的RAM,因此,我们不应认为这些“大”分区。

一旦创建了DataFrames列表并重新分区了每个列表,我可以要求spark将它们全部并行写入磁盘。

是否有可能在火花中出现这种情况?

[我在概念上不清楚的一件事:说我有

val x = spark.sql("select * from source")

val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")

val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")

y在什么程度上不同于z?如果我重新分配y,那么洗牌对zx有什么影响?

问题概述:假设我在AWS的EMR集群上通过Spark处理了300+ GB的数据。此数据具有用于在Hive中使用的文件系统分区的三个属性:日期,小时和(...

parallel-processing apache-spark-sql orc
1个回答
0
投票

此声明:

© www.soinside.com 2019 - 2024. All rights reserved.