如何使用 pyspark 根据间隙总和插入缺失值?

问题描述 投票:0回答:1

我有一个包含四个字段的时间序列数据集,例如:user_id、timestamp、miles 和total_mileage。英里是在一个时间步长内行驶的英里数,总里程是在时间步长结束时汽车的里程数。 Miles 和total_mileage 有缺失值,可以有一个或多个连续缺失。我想根据总缺失间隙的总和来插入英里的缺失值。 时间步长是均匀分布的。 因此,例如:

用户ID 时间戳 英里 总里程
1 2023-01-01 00:00 3 10
1 2023-01-01 01:00
1 2023-01-01 02:00
1 2023-01-01 03:00
1 2023-01-01 04:00 4 20

应该变成:

用户ID 时间戳 英里 总里程 失踪了
1 2023-01-01 00:00 3 10 0
1 2023-01-01 01:00 2 12 1
1 2023-01-01 02:00 2 14 1
1 2023-01-01 03:00 2 16 1
1 2023-01-01 04:00 4 20 0

我的数据集中有多个用户,因此需要为每个用户执行此操作。

我写了这个函数:

def id_missing_gaps(df: DataFrame, val_col: StringType, id_col: StringType, ts_col: StringType) -> DataFrame:
    windowSpec = Window.partitionBy(id_col).orderBy(ts_col)
    # Create 'value_forward' column using lead function to look at the next row's 'value'
    df = df.withColumn("value_forward", F.lead(val_col).over(windowSpec))
    # Create 'value_backward' column using lag function to look at the previous row's 'value'
    df = df.withColumn("value_backward", F.lag(val_col).over(windowSpec))
    # Using lead() to look at the next row's 'value' for is_before_gap
    df = df.withColumn("next_value", F.lead(val_col).over(windowSpec))

    df = df.withColumn("is_before_gap", F.when((F.col(val_col).isNotNull()) & 
                        (F.lead(val_col).over(windowSpec).isNull()) &
                        (F.lead(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the last row
                        True).otherwise(False))

    df = df.withColumn("is_after_gap", F.when(
                            (F.col(val_col).isNotNull()) & 
                            (F.col("value_backward").isNull()) &
                            (F.lag(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the first row
                            True).otherwise(False))

    # Clean up temporary columns
    df = df.drop("next_value", "value_forward", "value_backward")
    
    df = df.withColumn("is_gap", F.when(((F.col(val_col).isNull()) |
                        (F.col("is_before_gap") == True) |
                        (F.col("is_after_gap") == True)), 
                        1).otherwise(0))
    
    df = df.filter(F.col("is_gap") == 1)

    return df

这可以识别每个带有缺失值的“间隙”以及紧邻该值之前和之后的值。我以为我可以使用剩余的数据帧来插入所有值,但我一直无法找到一种方法来实现我想要的。

因此,对于上面的示例,我的函数的输出是:

用户ID 时间戳 英里 总里程 is_before_gap is_after_gap 是_间隙
1 2023-01-01 00:00:00 3 10 真实 1
1 2023-01-01 01:00:00 1
1 2023-01-01 02:00:00 1
1 2023-01-01 03:00:00 1
1 2023-01-01 04:00:00 4 20 真实 1

我确实找到了有关 pyspark 中插值的答案:How to interpolate a column inside a grouped object in PySpark? 并阅读其中链接的媒体文章:https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1 但这没有考虑total_mileage。 我正在考虑的一个选项是计算缺失值的最后一个时间步的total_mileage(在示例中为16),然后使用上面链接中的方法对total_mileage列进行插值,然后使用这些值来计算英里列。然而,这似乎效率低下。 我正在使用的数据有数百万行,因此我对于使用 pyspark pandas 和/或 UDF 犹豫不决。 如果有人能指出我如何继续的正确方向,我将不胜感激。

dataframe pyspark data-cleaning linear-interpolation
1个回答
0
投票

这是一个需要解决的非常有趣的问题。

在 pyspark 中,您可以使用第一个非空值或最后一个非空值填充窗口规范上的列。

然后我们还可以识别聚集在一起的空值组 然后排名高于他们。

一旦我们有了上面两个值,计算插值就是 只是使用填充值和排名进行算术运算。

这是一个工作示例。

from pyspark.sql.functions import *
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, lag, lead
from pyspark.sql.window import Window

# Sample data
data = [
    ("1", "2023-01-01 00:00", 3, 10),
    ("1", "2023-01-01 01:00", None, None),
    ("1", "2023-01-01 02:00", None, None),
    ("1", "2023-01-01 03:00", None, None),
    ("1", "2023-01-01 04:00", 4, 20),
    ("1", "2023-01-01 05:00", None, None),
    ("1", "2023-01-01 06:00", None, None),
    ("1", "2023-01-01 07:00", None, None),
    ("1", "2023-01-01 08:00", 4, 30)

]

schema = ["user_id", "timestamp", "miles", "total_mileage"]


spark = SparkSession.builder.appName("InterpolateNulls").getOrCreate()

df = spark.createDataFrame(data, schema=schema)
df = df.withColumn("was_missing", when(col("miles").isNull(), 1).otherwise(0))
df.show()

windowSpecLast = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
windowSpecFirst = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(0, Window.unboundedFollowing)
windowSpecNormal = Window.partitionBy("user_id").orderBy("timestamp")

df = df.withColumn("diff_mile", col("total_mileage") - col("miles"))

# Identify the rows that are immediately after a non-null row (start of window) and
# rows immediately before a non-null row (end of window)
df = df.withColumn("last_nonnull_mileage", last("total_mileage", ignorenulls=True).over(windowSpecLast))
df = df.withColumn("first_nonnull_mileage", first("diff_mile", ignorenulls=True).over(windowSpecFirst))
df = df.withColumn("first_mileage", lead("total_mileage").over(windowSpecNormal))

df.show()

df = df.withColumn("start_col", F.when(
    ((F.col("total_mileage").isNotNull()) & (F.col("first_mileage").isNull())), 1).otherwise(0))


windowSpec = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)

df = df.withColumn("groupId", F.sum("start_col").over(windowSpec))

df_nulls_grouped = df.groupby("groupId", "user_id").agg(F.count(F.when(F.col("miles").isNull(), 1)).alias("nulls_in_miles"))

# Display the result
df_nulls_grouped.show()

df_new = df.join(df_nulls_grouped, on=["user_id", "groupId"], how="inner")

df_new.orderBy("timestamp").show()

windowSpecRankOverNulls = Window.partitionBy("user_id", "groupId").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)

df_ranked = df_new.withColumn("ranked", rank().over(windowSpecRankOverNulls))

df_ranked.show()

df_ranked = df_ranked.withColumn("miles_inter", (col("first_nonnull_mileage") - col("last_nonnull_mileage")) / col("nulls_in_miles"))
df_ranked = df_ranked.withColumn("total_mileage_inter", col("last_nonnull_mileage") + (col("miles_inter") * ( col("ranked") - 1)))

df_ranked.show()

df_final = df_ranked.withColumn("miles_final", when(col("miles").isNotNull(), col("miles")).otherwise(col("miles_inter")))
df_final = df_final.withColumn("total_mileage_final", when(col("total_mileage").isNotNull(), col("total_mileage")).otherwise(col("total_mileage_inter")))
df_final.show()

df_final = df_final.select("user_id", "timestamp", "miles_final", "total_mileage_final", "was_missing")
df_final.show()

最终输出:

+-------+----------------+-----------+-------------------+-----------+
|user_id|       timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
|      1|2023-01-01 00:00|        3.0|               10.0|          0|
|      1|2023-01-01 01:00|        2.0|               12.0|          1|
|      1|2023-01-01 02:00|        2.0|               14.0|          1|
|      1|2023-01-01 03:00|        2.0|               16.0|          1|
|      1|2023-01-01 04:00|        4.0|               20.0|          0|
|      1|2023-01-01 05:00|        2.0|               22.0|          1|
|      1|2023-01-01 06:00|        2.0|               24.0|          1|
|      1|2023-01-01 07:00|        2.0|               26.0|          1|
|      1|2023-01-01 08:00|        4.0|               30.0|          0|
+-------+----------------+-----------+-------------------+-----------+

总产量:

+-------+----------------+-----+-------------+-----------+
|user_id|       timestamp|miles|total_mileage|was_missing|
+-------+----------------+-----+-------------+-----------+
|      1|2023-01-01 00:00|    3|           10|          0|
|      1|2023-01-01 01:00| NULL|         NULL|          1|
|      1|2023-01-01 02:00| NULL|         NULL|          1|
|      1|2023-01-01 03:00| NULL|         NULL|          1|
|      1|2023-01-01 04:00|    4|           20|          0|
|      1|2023-01-01 05:00| NULL|         NULL|          1|
|      1|2023-01-01 06:00| NULL|         NULL|          1|
|      1|2023-01-01 07:00| NULL|         NULL|          1|
|      1|2023-01-01 08:00|    4|           30|          0|
+-------+----------------+-----+-------------+-----------+

+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
|user_id|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|
|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|
|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|
|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|
|      1|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|
|      1|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|
|      1|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|
|      1|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|
|      1|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+

+-------+-------+--------------+
|groupId|user_id|nulls_in_miles|
+-------+-------+--------------+
|      1|      1|             3|
|      2|      1|             3|
|      3|      1|             0|
+-------+-------+--------------+

+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
|user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
|      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|
|      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|
|      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|
|      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|
|      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|
|      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|
|      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|
|      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|
|      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+

+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
|user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
|      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|     1|
|      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     2|
|      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     3|
|      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|     4|
|      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|     1|
|      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     2|
|      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     3|
|      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|     4|
|      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|     1|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+

+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
|user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|        miles_inter|total_mileage_inter|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
|      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|     1|               -1.0|               10.0|
|      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     2|                2.0|               12.0|
|      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     3|                2.0|               14.0|
|      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|     4|                2.0|               16.0|
|      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|     1|-1.3333333333333333|               20.0|
|      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     2|                2.0|               22.0|
|      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     3|                2.0|               24.0|
|      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|     4|                2.0|               26.0|
|      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|     1|               NULL|               NULL|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+

+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
|user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|        miles_inter|total_mileage_inter|miles_final|total_mileage_final|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
|      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|     1|               -1.0|               10.0|        3.0|               10.0|
|      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     2|                2.0|               12.0|        2.0|               12.0|
|      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     3|                2.0|               14.0|        2.0|               14.0|
|      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|     4|                2.0|               16.0|        2.0|               16.0|
|      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|     1|-1.3333333333333333|               20.0|        4.0|               20.0|
|      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     2|                2.0|               22.0|        2.0|               22.0|
|      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     3|                2.0|               24.0|        2.0|               24.0|
|      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|     4|                2.0|               26.0|        2.0|               26.0|
|      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|     1|               NULL|               NULL|        4.0|               30.0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+

+-------+----------------+-----------+-------------------+-----------+
|user_id|       timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
|      1|2023-01-01 00:00|        3.0|               10.0|          0|
|      1|2023-01-01 01:00|        2.0|               12.0|          1|
|      1|2023-01-01 02:00|        2.0|               14.0|          1|
|      1|2023-01-01 03:00|        2.0|               16.0|          1|
|      1|2023-01-01 04:00|        4.0|               20.0|          0|
|      1|2023-01-01 05:00|        2.0|               22.0|          1|
|      1|2023-01-01 06:00|        2.0|               24.0|          1|
|      1|2023-01-01 07:00|        2.0|               26.0|          1|
|      1|2023-01-01 08:00|        4.0|               30.0|          0|
+-------+----------------+-----------+-------------------+-----------+
© www.soinside.com 2019 - 2024. All rights reserved.