我想就我现在面临的问题寻求一些帮助。给定数据集:
df = spark.createDataFrame([
('2024-01-01', 1, 23),
('2024-01-02', 1, 43),
('2024-01-03', 1, -1),
('2024-01-08', 2, 266),
('2024-01-09', 2, -1),
('2024-01-10', 2, 13),
('2024-01-11', 2, 10),
('2024-01-04', 3, 66),
('2024-01-05', 3, -1),
('2024-01-06', 3, 13),
('2024-01-07', 3, 11),
],
["dates", "id", "mount"]
)
我想知道我应该应用哪种转换来获取每个 id 连续天数高于 0。
例如 id 1 比 0 早 2 天(前两天)。那么 id 2 仅比 0 高 1 天,然后是 2 天(最后 2 天)。最后第三个用户是 1 天,最后 2 个用户是 0 以上。最终的数据框应该如下所示:
some_id, id, days
1, 2
2, 1
2, 2
3, 1
3, 2
我添加了第三个列名称作为 some_id 因为我很确定我们需要以某种方式识别某些情况。
非常感谢任何帮助。
一个好的方法如下
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Consecutive Days") \
.getOrCreate()
df = spark.createDataFrame([
('2024-01-01', 1, 23),
('2024-01-02', 1, 43),
('2024-01-03', 1, -1),
('2024-01-08', 2, 266),
('2024-01-09', 2, -1),
('2024-01-10', 2, 13),
('2024-01-11', 2, 10),
('2024-01-04', 3, 66),
('2024-01-05', 3, -1),
('2024-01-06', 3, 13),
('2024-01-07', 3, 11),
], ["dates", "id", "mount"])
df = df.withColumn("dates", F.to_date("dates"))
windowSpec = Window.partitionBy("id").orderBy("dates")
df = df.withColumn("isPositive", F.when(F.col("mount") > 0, 1).otherwise(0))
df = df.withColumn("prevValue", F.lag("isPositive").over(windowSpec))
df = df.withColumn("change", F.when(F.col("isPositive") != F.col("prevValue"), 1).otherwise(0))
df = df.withColumn("group", F.sum("change").over(windowSpec))
df_positive = df.filter(F.col("mount") > 0)
df_result = df_positive.groupBy("id", "group").agg(F.count("dates").alias("days"))
windowSpec2 = Window.partitionBy("id").orderBy("days")
df_final = df_result.withColumn("some_id", F.row_number().over(windowSpec2)).select("id", "days","some_id")
df_final.show()
这给了你你想要的
id days some_id
0 1 2 1
1 2 1 1
2 2 2 2
3 3 1 1
4 3 2 2