我有如下的员工数据。我想按 EMP_ID 对以下数据进行分组,如果该分组的 EMP_ID 的“状态”值为“未完成”,则 EMP_ID 的整个“总体状态”应标记为“未完成”。如何在 Dataframe 或 SparkSql 中实现这一目标?
输入:
EMP_ID | 状态 |
---|---|
1 | 完成 |
1 | 未完成 |
1 | 完成 |
2 | 完成 |
2 | 完成 |
预期输出:
EMP_ID | 状态 | 整体状态 |
---|---|---|
1 | 完成 | 未完成 |
1 | 未完成 | 未完成 |
1 | 完成 | 未完成 |
2 | 完成 | 完成 |
2 | 完成 | 完成 |
我尝试使用简单的策略来解决它。
我对
EMP_ID
进行了分组并收集了 Status
列的不同值。
然后,我根据以下事实创建了一个
overall_status
列:如果不同值包含 Not Done
,则该列将具有值 Not Done
,否则为 Done
。
然后将这个创建的数据框与原始数据框加入
EMP_ID
import sys
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
["1", "Done"],
["1", "Not Done"],
["1", "Done"],
["2", "Done"],
["2", "Done"],
]
df1Columns = ["EMP_ID", "Status"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1.show(n=100, truncate=False)
df1_unique_values = df1.groupby("EMP_ID").agg(F.collect_set( F.col("Status")).alias("distinct_status")) \
.withColumn("overall_status", F.when( F.array_contains( F.col("distinct_status"), "Not Done"), "Not Done").otherwise("Done") ).drop("distinct_status")
df1_unique_values.show(n=100, truncate=False)
df1_final = df1.join(df1_unique_values, on=["EMP_ID"])
df1_final.show(n=100, truncate=False)
输出:
+------+--------+
|EMP_ID|Status |
+------+--------+
|1 |Done |
|1 |Not Done|
|1 |Done |
|2 |Done |
|2 |Done |
+------+--------+
+------+--------------+
|EMP_ID|overall_status|
+------+--------------+
|1 |Not Done |
|2 |Done |
+------+--------------+
+------+--------+--------------+
|EMP_ID|Status |overall_status|
+------+--------+--------------+
|1 |Done |Not Done |
|1 |Not Done|Not Done |
|1 |Done |Not Done |
|2 |Done |Done |
|2 |Done |Done |
+------+--------+--------------+