我想做一些非常简单的事情,我有一些非常愚蠢的挣扎。我认为这必然与对火花正在做什么的根本误解有关。我非常感谢任何帮助或解释。
我有一个非常大的(~3 TB,~300MM行,25k分区)表,在s3中保存为镶木地板,我想给某人一个小样本作为单个镶木地板文件。不幸的是,这需要永远完成,我不明白为什么。我尝试过以下方法:
tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")
然后,当那不起作用我尝试了这个,我认为应该是相同的,但我不确定。 (为了调试,我添加了print
。)
tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")
当我观看Yarn UI时,打印语句和write
都使用25k mappers。 count
花了3分钟,show
花了25分钟,write
花了大约40分钟,虽然它最终写了我正在寻找的单个文件表。
在我看来,第一行应该占据前500行并将它们合并到一个分区,然后其他行应该发生得非常快(在单个映射器/缩减器上)。谁能看到我在这里做错了什么?我被告知也许我应该使用sample
而不是limit
,但据我所知,limit
应该快得多。是对的吗?
提前感谢任何想法!
我将首先接近print
函数问题,因为它是理解spark的基础。然后limit
vs sample
。然后repartition
vs coalesce
。
print
函数以这种方式运行这么长时间的原因是因为coalesce
是一个懒惰的转换。 spark中的大多数转换都是惰性的,在调用动作之前不会对其进行求值。
操作是做事的事情,并且(大多数时候)不会返回新的数据帧。像count
,show
。它们返回一个数字和一些数据,而coalesce
返回一个带有1个分区的数据帧(有点,见下文)。
发生的事情是,每次在coalesce
数据帧上调用操作时,都要重新运行sql查询和tiny
调用。这就是他们为每次通话使用25k映射器的原因。
为了节省时间,将.cache()
方法添加到第一行(无论如何,为您的print
代码)。
然后,数据帧转换实际上在第一行执行,结果保存在spark节点的内存中。
这不会对第一行的初始查询时间产生任何影响,但至少您没有再运行该查询2次,因为结果已被缓存,然后操作可以使用该缓存的结果。
要从内存中删除它,请使用.unpersist()
方法。
现在对于您正在尝试的实际查询...
这实际上取决于数据的分区方式。如在,它是在特定领域等分区...
你在问题中提到过它,但sample
可能是正确的方法。
为什么是这样?
limit
必须搜索第一行中的500行。除非您的数据按行号(或某种递增ID)进行分区,否则前500行可以存储在任何25k分区中。
因此,火花必须搜索所有这些,直到它找到所有正确的值。不仅如此,它还必须执行额外的步骤来对数据进行排序以获得正确的顺序。
sample
只抓获500个随机值。更容易做到因为没有涉及的数据的顺序/排序,它不必搜索特定行的特定分区。
虽然limit
可以更快,但它也有它的限制。我通常只将它用于非常小的子集,如10/20行。
现在进行分区....
我认为与coalesce
的问题是它实际上改变了分区。现在我不确定这个,所以盐少了。
根据pyspark
文档:
该操作导致狭窄的依赖性,例如,如果从1000个分区到100个分区,则不会进行随机播放,而是100个新分区中的每个分区将声明10个当前分区。
因此,您的500行实际上仍将位于25k物理分区中,这些分区被spark视为1个虚拟分区。
导致洗牌(通常很糟糕)和持续使用.repartition(1).cache()
的火花记忆在这里可能是一个好主意。因为当你使用write
时,不是让25k映射器看物理分区,而应该只有1个映射器查看火花存储器中的内容。然后write
变得容易。你也在处理一个小子集,所以任何改组应该(希望)是可管理的。
显然这通常是不好的做法,并没有改变火花可能会在执行原始sql查询时运行25k映射器的事实。希望sample
能够解决这个问题。
编辑以澄清改组,repartition
和coalesce
您在4节点群集上的16个分区中有2个数据集。您想要加入它们并在16个分区中作为新数据集写入。
数据1的第1行可能位于节点1上,第1行可能位于节点4上的数据2中。
为了将这些行连接在一起,spark必须物理地移动它们中的一个或两个,然后写入新分区。
这是一个在群集周围移动,物理移动的数据。
一切都被16分区并不重要,重要的是数据位于群集的哪个位置。
data.repartition(4)
将物理地将每个节点的每组4个分区中的数据移动到每个节点1个分区。
Spark可能会将所有4个分区从节点1移动到其他3个节点,在这些节点上的新单个分区中,反之亦然。
我不认为它会这样做,但这是一个极端的例子,证明了这一点。
虽然coalesce(4)
打电话,不会移动数据,但它更聪明。相反,它认识到“我已经有每个节点4个分区和4个节点......我只是将每个节点的所有4个分区称为一个分区,然后我将有4个分区!”
因此,它不需要移动任何数据,因为它只是将现有分区组合到一个连接的分区中。
试试这个,在我的实证经验中,重新分配对这类问题更有效:
tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")
如果你对镶木地板感兴趣,你不需要把它保存为桌子就更好了:
tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")