您好,有一个数据框,其中包含一些 user_id、活跃月份以及活跃月份。我需要执行下图所示的“active_months”列的计算,该数据框计算这个月有多少个连续活跃的月份用户处于活动状态。因此,当该用户花了 1 个多月才再次活跃时,我们将计数重新从 1 开始。
我无法对数据进行分组,我需要作为窗口函数工作,因为我在 user_id 级别还有其他操作要做
有人可以帮助我吗?
我尝试使用 Window().partitionBy(['account_id']).orderBy('reference_month').rowsBetween(Window.unboundedPreceding, Window.currentRow) 窗口函数,但它不会将计数保留为 1
希望我的测试数据框涵盖所有基础:
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql.window import Window
df = spark.createDataFrame([
(1, '2021-12-01', '2022-01-01'),
(1, '2022-01-01', '2022-02-01'),
(1, '2022-02-01', '2022-03-01'),
(2, '2023-01-01', '2023-03-01'),
(2, '2023-03-01', '2023-04-01'),
(2, '2023-04-01', '2023-07-01'),
], ['id', 'reference_month', 'lead_month'])
df = (
df
.select('id', f.col('reference_month').cast(DateType()), f.col('lead_month').cast(DateType()))
.withColumn('delta_lead_months', f.months_between(f.col('lead_month'), f.col('reference_month')))
.withColumn('active_months', f.count(f.col('reference_month')).over(Window.partitionBy('id').orderBy('reference_month').rowsBetween(Window.unboundedPreceding, Window.currentRow)))
)
df.show(truncate = False)
df.show(truncate = False)
和输出:
+---+---------------+----------+-----------------+-------------+
|id |reference_month|lead_month|delta_lead_months|active_months|
+---+---------------+----------+-----------------+-------------+
|1 |2021-12-01 |2022-01-01|1.0 |1 |
|1 |2022-01-01 |2022-02-01|1.0 |2 |
|1 |2022-02-01 |2022-03-01|1.0 |3 |
|2 |2023-01-01 |2023-03-01|2.0 |1 |
|2 |2023-03-01 |2023-04-01|1.0 |2 |
|2 |2023-04-01 |2023-07-01|3.0 |3 |
+---+---------------+----------+-----------------+-------------+