将spark数据帧写入单个镶木地板文件

问题描述 投票:2回答:2

我想做一些非常简单的事情,我有一些非常愚蠢的挣扎。我认为这必然与对火花正在做什么的根本误解有关。我非常感谢任何帮助或解释。

我有一个非常大的(~3 TB,~300MM行,25k分区)表,在s3中保存为镶木地板,我想给某人一个小样本作为单个镶木地板文件。不幸的是,这需要永远完成,我不明白为什么。我尝试过以下方法:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

然后,当那不起作用我尝试了这个,我认为应该是相同的,但我不确定。 (为了调试,我添加了print。)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

当我观看Yarn UI时,打印语句和write都使用25k mappers。 count花了3分钟,show花了25分钟,write花了大约40分钟,虽然它最终写了我正在寻找的单个文件表。

在我看来,第一行应该占据前500行并将它们合并到一个分区,然后其他行应该发生得非常快(在单个映射器/缩减器上)。谁能看到我在这里做错了什么?我被告知也许我应该使用sample而不是limit,但据我所知,limit应该快得多。是对的吗?

提前感谢任何想法!

apache-spark pyspark pyspark-sql
2个回答
3
投票

我将首先接近print函数问题,因为它是理解spark的基础。然后limit vs sample。然后repartition vs coalesce

print函数以这种方式运行这么长时间的原因是因为coalesce是一个懒惰的转换。 spark中的大多数转换都是惰性的,在调用动作之前不会对其进行求值。

操作是做事的事情,并且(大多数时候)不会返回新的数据帧。像countshow。它们返回一个数字和一些数据,而coalesce返回一个带有1个分区的数据帧(有点,见下文)。

发生的事情是,每次在coalesce数据帧上调用操作时,都要重新运行sql查询和tiny调用。这就是他们为每次通话使用25k映射器的原因。

为了节省时间,将.cache()方法添加到第一行(无论如何,为您的print代码)。

然后,数据帧转换实际上在第一行执行,结果保存在spark节点的内存中。

这不会对第一行的初始查询时间产生任何影响,但至少您没有再运行该查询2次,因为结果已被缓存,然后操作可以使用该缓存的结果。

要从内存中删除它,请使用.unpersist()方法。

现在对于您正在尝试的实际查询...

这实际上取决于数据的分区方式。如在,它是在特定领域等分区...

你在问题中提到过它,但sample可能是正确的方法。

为什么是这样?

limit必须搜索第一行中的500行。除非您的数据按行号(或某种递增ID)进行分区,否则前500行可以存储在任何25k分区中。

因此,火花必须搜索所有这些,直到它找到所有正确的值。不仅如此,它还必须执行额外的步骤来对数据进行排序以获得正确的顺序。

sample只抓获500个随机值。更容易做到因为没有涉及的数据的顺序/排序,它不必搜索特定行的特定分区。

虽然limit可以更快,但它也有它的限制。我通常只将它用于非常小的子集,如10/20行。

现在进行分区....

我认为与coalesce的问题是它实际上改变了分区。现在我不确定这个,所以盐少了。

根据pyspark文档:

该操作导致狭窄的依赖性,例如,如果从1000个分区到100个分区,则不会进行随机播放,而是100个新分区中的每个分区将声明10个当前分区。

因此,您的500行实际上仍将位于25k物理分区中,这些分区被spark视为1个虚拟分区。

导致洗牌(通常很糟糕)和持续使用.repartition(1).cache()的火花记忆在这里可能是一个好主意。因为当你使用write时,不是让25k映射器看物理分区,而应该只有1个映射器查看火花存储器中的内容。然后write变得容易。你也在处理一个小子集,所以任何改组应该(希望)是可管理的。

显然这通常是不好的做法,并没有改变火花可能会在执行原始sql查询时运行25k映射器的事实。希望sample能够解决这个问题。

编辑以澄清改组,repartitioncoalesce

您在4节点群集上的16个分区中有2个数据集。您想要加入它们并在16个分区中作为新数据集写入。

数据1的第1行可能位于节点1上,第1行可能位于节点4上的数据2中。

为了将这些行连接在一起,spark必须物理地移动它们中的一个或两个,然后写入新分区。

这是一个在群集周围移动,物理移动的数据。

一切都被16分区并不重要,重要的是数据位于群集的哪个位置。

data.repartition(4)将物理地将每个节点的每组4个分区中的数据移动到每个节点1个分区。

Spark可能会将所有4个分区从节点1移动到其他3个节点,在这些节点上的新单个分区中,反之亦然。

我不认为它会这样做,但这是一个极端的例子,证明了这一点。

虽然coalesce(4)打电话,不会移动数据,但它更聪明。相反,它认识到“我已经有每个节点4个分区和4个节点......我只是将每个节点的所有4个分区称为一个分区,然后我将有4个分区!”

因此,它不需要移动任何数据,因为它只是将现有分区组合到一个连接的分区中。


0
投票

试试这个,在我的实证经验中,重新分配对这类问题更有效:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")

如果你对镶木地板感兴趣,你不需要把它保存为桌子就更好了:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")
© www.soinside.com 2019 - 2024. All rights reserved.