topic

问题描述 投票:0回答:1

我有一个非常直接的pyspark SQL应用程序(spark 2.4.4, EMR 5.29),它读取了一个模式主题、年份、计数的数据框架。

df.show()

+--------+----+------+
|   topic|year| count|
+--------+----+------+
|covid-19|2017|606498|
|covid-19|2016|454678|
|covid-19|2011| 10517|
|covid-19|2008|  6193|
|covid-19|2015|510391|
|covid-19|2013| 29551|

然后,我需要按年份排序,并将计数收集到一个列表中,使它们按年份升序排列。

df.orderBy('year').groupBy('topic').agg(collect_list('count').alias('counts'))

问题是,因为我是按年排序的,所以这个阶段使用的分区数就是我的数据集的年数。因此,我得到了一个疯狂的瓶颈阶段,300个执行器中有15个被利用,导致明显的内存溢出和磁盘溢出,最终由于在设备上没有剩余的空间来处理过度拥挤的分区而导致该阶段失败。

更有趣的是,我找到了一种规避这种情况的方法,直观上看效率低得多,但实际上确实有效,因为不会产生瓶颈。

df.groupBy('topic').pivot('year', values=range(START, FINISH)).agg(first('count')) \
    .select('topic', array([col(c) for c in range(START, FINISH)]).alias('counts'))                        

这导致了我所期望的输出,即一个按年份排序的计数数组。

有谁能解释或知道为什么会发生这种情况,或者如何最好地防止这种情况?本回答 其中 该jira 其中基本上建议在按键排序时 "添加噪声 "以避免这些倾斜相关的问题。

我认为值得一提的是,pivot方法是比添加噪声更好的解决方法,而且据我所知,每当通过具有小范围值的列进行排序时,都会有这种方法。

pyspark apache-spark-sql amazon-emr hadoop-partitioning
1个回答
2
投票

这。

可以帮助或者你通过更多的列来排序,我怀疑。但在这里,根据你的DF,似乎不是这样的情况。另外,这个问题与pyspark无关。有趣的一点,但可以解释:减少的分区需要根据年份通过collect_list容纳更多的数据,显然主题比年份更多。

© www.soinside.com 2019 - 2024. All rights reserved.