假设我具有以下模式的5 TB数据,并且我正在使用Pyspark。
| id | date | Month | KPI_1 | ... | KPI_n
对于90%的KPI,我只需要知道合计(id,Month)级别的总和/最小/最大值。对于其余的10%,我需要根据日期知道第一个值。
我的一个选择是使用window
。例如,我可以做
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))
# for the 90% kpi
agg_df = df.withColumn("kpi_1", F.sum("kpi_1").over(w))
agg_df = agg_df.withColumn("kpi_2", F.max("kpi_2").over(w))
agg_df = agg_df.withColumn("kpi_3", F.min("kpi_3").over(w))
...
# Select last row for each window to get last accumulated sum for 90% kpis and last value for 10% kpi (which is equivalent to first value if ranked ascending).
# continue process agg_df with filters based on sum/max/min values of 90% KIPs.
但是我不确定如何选择每个窗口的最后一行。有没有人有任何建议,或者是否有更好的汇总方法?
假设我们有此数据
+---+----------+-------+-----+-----+
| id| date| month|kpi_1|kpi_2|
+---+----------+-------+-----+-----+
| 1|2000-01-01|2000-01| 1| 100|
| 1|2000-01-02|2000-01| 2| 200|
| 1|2000-01-03|2000-01| 3| 300|
| 1|2000-01-04|2000-01| 4| 400|
| 1|2000-01-05|2000-01| 5| 500|
| 1|2000-02-01|2000-02| 10| 11|
| 1|2000-02-02|2000-02| 20| 21|
| 1|2000-02-03|2000-02| 30| 31|
| 1|2000-02-04|2000-02| 40| 41|
+---+----------+-------+-----+-----+
并且我们要计算kpi_1
的最小值,最大值和总和,以获得每个组的kpi_2
的最后一个值。
可以通过按id
和month
对数据分组来获得最小值,最大值和和:
df_avg = df \
.groupBy("id","month") \
.agg(F.sum("kpi_1"), F.min("kpi_1"), F.max("kpi_1"), F.first("kpi_2"))\
.select("id", "month", "sum(kpi_1)", "min(kpi_1)", "max(kpi_1)")
df_avg.show()
打印
+---+-------+----------+----------+----------+
| id| month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|
+---+-------+----------+----------+----------+
| 1|2000-02| 100| 10| 40|
| 1|2000-01| 15| 1| 5|
+---+-------+----------+----------+----------+
获取每个组的kpi_2
的最后一个值比较困难。第一个想法可能是在按顺序排列的有序数据帧上使用聚合函数first()。一个简单的测试为我提供了正确的结果,但是不幸的是文档说明“该函数是不确定的,因为它的结果取决于行的顺序,这些顺序在洗牌后可能是不确定的”。
获取kpi_2
的最后一个值的更好方法是使用问题中所示的窗口。由于窗口功能row_number()将起作用:
w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))
df_first = df.withColumn("row_number", F.row_number().over(w)).where("row_number = 1")\
.drop("row_number") \
.select("id", "month", "KPI_2")
df_first.show()
打印
+---+-------+-----+
| id| month|KPI_2|
+---+-------+-----+
| 1|2000-02| 41|
| 1|2000-01| 500|
+---+-------+-----+
将两个部分结合在一起可以得到理想的结果:
df_result = df_avg.join(df_first, ['id', 'month'])
df_result.show()
打印
+---+-------+----------+----------+----------+-----+
| id| month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|KPI_2|
+---+-------+----------+----------+----------+-----+
| 1|2000-02| 100| 10| 40| 41|
| 1|2000-01| 15| 1| 5| 500|
+---+-------+----------+----------+----------+-----+