如何转换所发布问题的数据集?

问题描述 投票:0回答:1

我想就我现在面临的问题寻求一些帮助。给定数据集:

df = spark.createDataFrame([
        ('2024-01-01', 1, 23),
        ('2024-01-02', 1, 43),
        ('2024-01-03', 1, -1),
        ('2024-01-08', 2, 266),
        ('2024-01-09', 2, -1),
        ('2024-01-10', 2, 13),
        ('2024-01-11', 2, 10),
        ('2024-01-04', 3, 66),
        ('2024-01-05', 3, -1),
        ('2024-01-06', 3, 13),
        ('2024-01-07', 3, 11),
    ],
    ["dates", "id", "mount"]
)

我想知道我应该应用哪种转换来获取每个 id 连续天数高于 0

例如 id 1 比 0 早 2 天(前两天)。那么 id 2 仅比 0 高 1 天,然后是 2 天(最后 2 天)。最后第三个用户是 1 天,最后 2 个用户是 0 以上。最终的数据框应该如下所示:

some_id, id, days
         1,  2
         2,  1
         2,  2
         3,  1
         3,  2

我添加了第三个列名称作为 some_id 因为我很确定我们需要以某种方式识别某些情况。

非常感谢任何帮助。

pandas dataframe apache-spark pyspark apache-spark-sql
1个回答
0
投票

一个好的方法如下

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Consecutive Days") \
    .getOrCreate()

df = spark.createDataFrame([
    ('2024-01-01', 1, 23),
    ('2024-01-02', 1, 43),
    ('2024-01-03', 1, -1),
    ('2024-01-08', 2, 266),
    ('2024-01-09', 2, -1),
    ('2024-01-10', 2, 13),
    ('2024-01-11', 2, 10),
    ('2024-01-04', 3, 66),
    ('2024-01-05', 3, -1),
    ('2024-01-06', 3, 13),
    ('2024-01-07', 3, 11),
], ["dates", "id", "mount"])

df = df.withColumn("dates", F.to_date("dates"))

windowSpec = Window.partitionBy("id").orderBy("dates")

df = df.withColumn("isPositive", F.when(F.col("mount") > 0, 1).otherwise(0))
df = df.withColumn("prevValue", F.lag("isPositive").over(windowSpec))
df = df.withColumn("change", F.when(F.col("isPositive") != F.col("prevValue"), 1).otherwise(0))

df = df.withColumn("group", F.sum("change").over(windowSpec))

df_positive = df.filter(F.col("mount") > 0)

df_result = df_positive.groupBy("id", "group").agg(F.count("dates").alias("days"))

windowSpec2 = Window.partitionBy("id").orderBy("days")
df_final = df_result.withColumn("some_id", F.row_number().over(windowSpec2)).select("id", "days","some_id")

df_final.show()

这给了你你想要的

   id  days  some_id
0   1     2        1
1   2     1        1
2   2     2        2
3   3     1        1
4   3     2        2
© www.soinside.com 2019 - 2024. All rights reserved.