我的数据:
+-----+-----------+
| id | date|
+-----+-----------+
|id_1 |2015-09-03 |
|id_1 |2015-09-03 |
|id_2 |2015-09-03 |
|id_2 |2015-09-04 |
|id_2 |2015-09-06 |
+-----+-----------+
我正在尝试为每组“ id”收集日期集(然后计算出连续日期的最大数量)。
+-----+------------------------------------+
| id | date |
+-----+------------------------------------+
|id_1 | [2015-09-03] |
|id_2 | [2015-09-03,2015-09-04,2015-09-06] |
+-----+------------------------------------+
我使用的代码:
df = df.groupby("id").agg(F.collect_set('date'))
但是我收到一个空的df和DAG警告:
df.show()
+--------+
|id |date|
+--------+
+--------+
WARN DAGScheduler:广播大小为1738.4 KiB的大型任务二进制文件
((我也尝试使用count而不是collect_set,但出现了完全相同的问题。)
我使用conf.set(“ spark.sql.shuffle.partitions”,3000)将分区从默认的200增加到3000,以获得每个大约100MB的分区。这没有帮助:
[第三阶段:============================================= =======>(2979 +11)/ 3000] 20/02/03 09:44:02 WARN DAG时间表:大型广播大小为1591.9 KiB的任务二进制文件[Stage4:================================================ ==>(2936 + 10)/3000] 20/02/03 09:44:10 WARN DAGScheduler:广播大型任务二进制文件,大小为1768.8 KiB
最后,我只是决定使用RDD和reduceByKey来避免此问题。这种方式似乎比较慢---我创建了一个python函数来执行collect_set的工作。
无论如何,我不明白groupBy的问题是什么,有人可以帮我理解吗?
...................................................... ..............................................
我正在使用的Spark版本是3.0.0-preview2
我正在使用以下配置:
on['JAVA_HOME'] = "/opt/jdk/jdk1.8.0_241"
sys.path.insert(0, spark_home + "/python/lib/py4j-0.10.8.1-src.zip")
conf = SparkConf()
conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "6g")
conf.set("spark.sql.shuffle.partitions", 3000)
您必须同时使用concat_ws
和agg
df = df.groupby("id").agg(concat_ws(",",F.collect_set('date')))