如何将2列汇总到pyspark中的map

问题描述 投票:1回答:1

我有一个这样的DataFrame

a = spark.createDataFrame([['Alice', '2020-03-03', '1'], ['Bob', '2020-03-03', '1'], ['Bob', '2020-03-05', '2']], ['name', 'dt', 'hits'])
a.show()
+-----+----------+----+
| name|        dt|hits|
+-----+----------+----+
|Alice|2020-03-03|   1|
|  Bob|2020-03-03|   1|
|  Bob|2020-03-05|   2|
+-----+----------+----+

我想聚合dt并将“列”命中到地图中-

+-----+------------------------------------+
| name|    map                             |
+-----+------------------------------------+
|Alice|   {'2020-03-03': 1, '2020-03-05':2}|
|  Bob|   {'2020-03-03': 1}                |
+-----+------------------------------------+

但是此代码引发异常:

from pyspark.sql import functions as F
a = a.groupBy(F.col('name')).agg(F.create_map(F.col('dt'), F.col('hits')))

Py4JJavaError: An error occurred while calling o2920.agg.
: org.apache.spark.sql.AnalysisException: expression '`dt`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [name#1329], [name#1329, map(dt#1330, hits#1331) AS map(dt, hits)#1361]
+- LogicalRDD [name#1329, dt#1330, hits#1331], false

我在做什么错?

python pyspark pyspark-dataframes
1个回答
0
投票

对于spark2.4+,您可以像这样使用map_from_arrays

from pyspark.sql import functions as F

a.groupBy("name").agg(F.map_from_arrays(F.collect_list("dt").alias("dt"),\
                     F.collect_list("hits").alias("hits")).alias("map")).show(truncate=False)

#+-----+----------------------------------+
#|name |map                               |
#+-----+----------------------------------+
#|Bob  |[2020-03-03 -> 1, 2020-03-05 -> 2]|
#|Alice|[2020-03-03 -> 1]                 |
#+-----+----------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.