我有一个pyspark数据框。这是电影数据集。一栏是被“ |”分割的流派。每部电影都有多种流派。
genres = spark.sql("SELECT DISTINCT genres FROM movies ORDER BY genres ASC")
genres.show(5)
这是一种方法:
# sample data
d = [('Action',), ('Action|Adventure',), ('Action|Adventure|Drama',)]
df = spark.createDataFrame(d, ['genres',])
# create count
agg_df = (df
.rdd
.map(lambda x: x.genres.split('|')) # gives nested list
.flatMap(lambda x: x) # flatten the list
.map(lambda x: (x,)) # convert to tuples
.toDF(['genres'])
.groupby('genres')
.count())
agg_df.show()
+---------+-----+
| genres|count|
+---------+-----+
|Adventure| 2|
| Drama| 1|
| Action| 3|
+---------+-----+
这里是仅使用DataFrame API的方法。首先,使用split
函数拆分genres
字符串,然后使用explode
结果数组和groupBy genres
进行计数:
data = [["Action"], ["Action|Adventure|Thriller"], ["Action|Adventure|Drama"]]
df = spark.createDataFrame(data, ["genres"])
df = df.withColumn("genres", explode(split(col("genres"), "[|]"))) \
.groupBy("genres").count()
df.show()
给予:
+---------+-----+
| genres|count|
+---------+-----+
| Thriller| 1|
|Adventure| 2|
| Drama| 1|
| Action| 3|
+---------+-----+
用途:
import pyspark.sql.functions as f
df.groupby("generes").agg(f.collect_set("Category"),f.count("Category")).show()
这将获得所需的输出。