PySpark:在分组数据中查找特定值并将整个组标记为不同值

问题描述 投票:0回答:1

我有如下的员工数据。我想按 EMP_ID 对以下数据进行分组,如果该分组的 EMP_ID 的“状态”值为“未完成”,则 EMP_ID 的整个“总体状态”应标记为“未完成”。如何在 Dataframe 或 SparkSql 中实现这一目标?

输入:

EMP_ID 状态
1 完成
1 未完成
1 完成
2 完成
2 完成

预期输出:

EMP_ID 状态 整体状态
1 完成 未完成
1 未完成 未完成
1 完成 未完成
2 完成 完成
2 完成 完成
apache-spark pyspark apache-spark-sql aws-glue apache-spark-dataset
1个回答
0
投票

我尝试使用简单的策略来解决它。

我对

EMP_ID
进行了分组并收集了
Status
列的不同值。

然后,我根据以下事实创建了一个

overall_status
列:如果不同值包含
Not Done
,则该列将具有值
Not Done
,否则为
Done

然后将这个创建的数据框与原始数据框加入

EMP_ID

import sys
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf


sc = SparkContext('local')
sqlContext = SQLContext(sc)


data1 = [

["1",   "Done"],
["1",   "Not Done"],
["1",   "Done"],
["2",   "Done"],
["2",   "Done"],
      ]

df1Columns = ["EMP_ID", "Status"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)


df1.show(n=100, truncate=False)


df1_unique_values = df1.groupby("EMP_ID").agg(F.collect_set( F.col("Status")).alias("distinct_status")) \
                        .withColumn("overall_status", F.when(  F.array_contains( F.col("distinct_status"), "Not Done"), "Not Done").otherwise("Done")  ).drop("distinct_status")

df1_unique_values.show(n=100, truncate=False)


df1_final =  df1.join(df1_unique_values, on=["EMP_ID"])


df1_final.show(n=100, truncate=False)

输出:

+------+--------+
|EMP_ID|Status  |
+------+--------+
|1     |Done    |
|1     |Not Done|
|1     |Done    |
|2     |Done    |
|2     |Done    |
+------+--------+

+------+--------------+
|EMP_ID|overall_status|
+------+--------------+
|1     |Not Done      |
|2     |Done          |
+------+--------------+

+------+--------+--------------+
|EMP_ID|Status  |overall_status|
+------+--------+--------------+
|1     |Done    |Not Done      |
|1     |Not Done|Not Done      |
|1     |Done    |Not Done      |
|2     |Done    |Done          |
|2     |Done    |Done          |
+------+--------+--------------+
 
© www.soinside.com 2019 - 2024. All rights reserved.