我有一张像下面这样的大桌子-
id | 状态 |
---|---|
1 | 进行中 |
1 | 进行中 |
2 | 进行中 |
2 | 开始 |
1 | 进行中 |
1 | 开始 |
3 | 进行中 |
3 | 开始 |
1 | 进行中 |
1 | 开始 |
1 | 结束 |
1 | 进行中 |
1 | 结束 |
2 | 进行中 |
3 | 进行中 |
3 | 结束 |
在使用函数的 PySpark 中,想要在单独的列中导出每个状态的id-wise 计数,如下所示:
from pyspark.sql.functions import count, when
data = [
(1, "In Progress"),
(1, "In Progress"),
(2, "In Progress"),
(2, "Start"),
(1, "In Progress"),
(1, "Start"),
(3, "In Progress"),
(3, "Start"),
(1, "In Progress"),
(1, "Start"),
(1, "End"),
(1, "In Progress"),
(1, "End"),
(2, "In Progress"),
(3, "In Progress"),
(3, "End"),
]
df = spark.createDataFrame(data, ["id", "status"])
output_df = df.groupBy("id").agg(
count(when(df.status == "Start", True)).alias("start-count"),
count(when(df.status == "In Progress", True)).alias("in-progress-count"),
count(when(df.status == "End", True)).alias("end-count"),
)
output_df.show()
输出:
+---+-----------+-----------------+---------+
| id|start-count|in-progress-count|end-count|
+---+-----------+-----------------+---------+
| 1| 2| 5| 2|
| 2| 1| 2| 0|
| 3| 1| 2| 1|
+---+-----------+-----------------+---------+
higher
订单功能。
size(filter(cl, x->x=='start'))
-> 过滤其中仅具有
start
状态的数组。
Example:
df = spark.createDataFrame([('1','in-progress'),('1','in-progress'),('2','in-progress'),('3','start')],['id','status'])
df.groupBy("id").agg(collect_list(col("status")).alias("cl")).\
withColumn("start-count", expr("""size(filter(cl, x->x=='start'))""")).\
withColumn("in-progress-count", expr("""size(filter(cl, x->x=='in-progress'))""")).\
show(10,False)
#+---+-----------+-----------------+
#|id |start-count|in-progress-count|
#+---+-----------+-----------------+
#|1 |0 |2 |
#|2 |0 |1 |
#|3 |1 |0 |
#+---+-----------+-----------------