PySpark数据帧:groupBy返回“广播大小为1591.9 KiB的大型任务二进制文件”

问题描述 投票:0回答:1

我的数据:

+-----+-----------+
|  id |       date|
+-----+-----------+
|id_1 |2015-09-03 |
|id_1 |2015-09-03 |
|id_2 |2015-09-03 |
|id_2 |2015-09-04 |
|id_2 |2015-09-06 |
+-----+-----------+

我正在尝试为每组“ id”收集日期集(然后计算出连续日期的最大数量)。

+-----+------------------------------------+
|  id |                              date  |
+-----+------------------------------------+
|id_1 | [2015-09-03]                       |
|id_2 | [2015-09-03,2015-09-04,2015-09-06] |
+-----+------------------------------------+

我使用的代码:

df = df.groupby("id").agg(F.collect_set('date'))

但是我收到一个空的df和DAG警告:

df.show()

+--------+
|id |date|
+--------+
+--------+

WARN DAGScheduler:广播大小为1738.4 KiB的大型任务二进制文件

((我也尝试使用count而不是collect_set,但出现了完全相同的问题。)

我使用conf.set(“ spark.sql.shuffle.partitions”,3000)将分区从默认的200增加到3000,以获得每个大约100MB的分区。这没有帮助:

[第三阶段:============================================= =======>(2979 +11)/ 3000] 20/02/03 09:44:02 WARN DAG时间表:大型广播大小为1591.9 KiB的任务二进制文件[Stage4:================================================ ==>(2936 + 10)/3000] 20/02/03 09:44:10 WARN DAGScheduler:广播大型任务二进制文件,大小为1768.8 KiB

最后,我只是决定使用RDD和reduceByKey来避免此问题。这种方式似乎比较慢---我创建了一个python函数来执行collect_set的工作。

无论如何,我不明白groupBy的问题是什么,有人可以帮我理解吗?

...................................................... ..............................................

我正在使用的Spark版本是3.0.0-preview2

我正在使用以下配置:

on['JAVA_HOME'] = "/opt/jdk/jdk1.8.0_241"
sys.path.insert(0, spark_home + "/python/lib/py4j-0.10.8.1-src.zip")

conf = SparkConf()

conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "6g")
conf.set("spark.sql.shuffle.partitions", 3000)
pyspark group-by apache-spark-sql partitioning
1个回答
0
投票

您必须同时使用concat_wsagg

df = df.groupby("id").agg(concat_ws(",",F.collect_set('date')))
© www.soinside.com 2019 - 2024. All rights reserved.