我有一个包含四个字段的时间序列数据集,例如:user_id、timestamp、miles 和total_mileage。英里是在一个时间步长内行驶的英里数,总里程是在时间步长结束时汽车的里程数。 Miles 和total_mileage 有缺失值,可以有一个或多个连续缺失。我想根据总缺失间隙的总和来插入英里的缺失值。 时间步长是均匀分布的。 因此,例如:
用户ID | 时间戳 | 英里 | 总里程 |
---|---|---|---|
1 | 2023-01-01 00:00 | 3 | 10 |
1 | 2023-01-01 01:00 | 空 | 空 |
1 | 2023-01-01 02:00 | 空 | 空 |
1 | 2023-01-01 03:00 | 空 | 空 |
1 | 2023-01-01 04:00 | 4 | 20 |
应该变成:
用户ID | 时间戳 | 英里 | 总里程 | 失踪了 |
---|---|---|---|---|
1 | 2023-01-01 00:00 | 3 | 10 | 0 |
1 | 2023-01-01 01:00 | 2 | 12 | 1 |
1 | 2023-01-01 02:00 | 2 | 14 | 1 |
1 | 2023-01-01 03:00 | 2 | 16 | 1 |
1 | 2023-01-01 04:00 | 4 | 20 | 0 |
我的数据集中有多个用户,因此需要为每个用户执行此操作。
我写了这个函数:
def id_missing_gaps(df: DataFrame, val_col: StringType, id_col: StringType, ts_col: StringType) -> DataFrame:
windowSpec = Window.partitionBy(id_col).orderBy(ts_col)
# Create 'value_forward' column using lead function to look at the next row's 'value'
df = df.withColumn("value_forward", F.lead(val_col).over(windowSpec))
# Create 'value_backward' column using lag function to look at the previous row's 'value'
df = df.withColumn("value_backward", F.lag(val_col).over(windowSpec))
# Using lead() to look at the next row's 'value' for is_before_gap
df = df.withColumn("next_value", F.lead(val_col).over(windowSpec))
df = df.withColumn("is_before_gap", F.when((F.col(val_col).isNotNull()) &
(F.lead(val_col).over(windowSpec).isNull()) &
(F.lead(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the last row
True).otherwise(False))
df = df.withColumn("is_after_gap", F.when(
(F.col(val_col).isNotNull()) &
(F.col("value_backward").isNull()) &
(F.lag(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the first row
True).otherwise(False))
# Clean up temporary columns
df = df.drop("next_value", "value_forward", "value_backward")
df = df.withColumn("is_gap", F.when(((F.col(val_col).isNull()) |
(F.col("is_before_gap") == True) |
(F.col("is_after_gap") == True)),
1).otherwise(0))
df = df.filter(F.col("is_gap") == 1)
return df
这可以识别每个带有缺失值的“间隙”以及紧邻该值之前和之后的值。我以为我可以使用剩余的数据帧来插入所有值,但我一直无法找到一种方法来实现我想要的。
因此,对于上面的示例,我的函数的输出是:
用户ID | 时间戳 | 英里 | 总里程 | is_before_gap | is_after_gap | 是_间隙 |
---|---|---|---|---|---|---|
1 | 2023-01-01 00:00:00 | 3 | 10 | 真实 | 假 | 1 |
1 | 2023-01-01 01:00:00 | 空 | 空 | 假 | 假 | 1 |
1 | 2023-01-01 02:00:00 | 空 | 空 | 假 | 假 | 1 |
1 | 2023-01-01 03:00:00 | 空 | 空 | 假 | 假 | 1 |
1 | 2023-01-01 04:00:00 | 4 | 20 | 假 | 真实 | 1 |
我确实找到了有关 pyspark 中插值的答案:How to interpolate a column inside a grouped object in PySpark? 并阅读其中链接的媒体文章:https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1 但这没有考虑total_mileage。 我正在考虑的一个选项是计算缺失值的最后一个时间步的total_mileage(在示例中为16),然后使用上面链接中的方法对total_mileage列进行插值,然后使用这些值来计算英里列。然而,这似乎效率低下。 我正在使用的数据有数百万行,因此我对于使用 pyspark pandas 和/或 UDF 犹豫不决。 如果有人能指出我如何继续的正确方向,我将不胜感激。
这是一个需要解决的非常有趣的问题。
在 pyspark 中,您可以使用第一个非空值或最后一个非空值填充窗口规范上的列。
然后我们还可以识别聚集在一起的空值组 然后排名高于他们。
一旦我们有了上面两个值,计算插值就是 只是使用填充值和排名进行算术运算。
这是一个工作示例。
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, lag, lead
from pyspark.sql.window import Window
# Sample data
data = [
("1", "2023-01-01 00:00", 3, 10),
("1", "2023-01-01 01:00", None, None),
("1", "2023-01-01 02:00", None, None),
("1", "2023-01-01 03:00", None, None),
("1", "2023-01-01 04:00", 4, 20),
("1", "2023-01-01 05:00", None, None),
("1", "2023-01-01 06:00", None, None),
("1", "2023-01-01 07:00", None, None),
("1", "2023-01-01 08:00", 4, 30)
]
schema = ["user_id", "timestamp", "miles", "total_mileage"]
spark = SparkSession.builder.appName("InterpolateNulls").getOrCreate()
df = spark.createDataFrame(data, schema=schema)
df = df.withColumn("was_missing", when(col("miles").isNull(), 1).otherwise(0))
df.show()
windowSpecLast = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
windowSpecFirst = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(0, Window.unboundedFollowing)
windowSpecNormal = Window.partitionBy("user_id").orderBy("timestamp")
df = df.withColumn("diff_mile", col("total_mileage") - col("miles"))
# Identify the rows that are immediately after a non-null row (start of window) and
# rows immediately before a non-null row (end of window)
df = df.withColumn("last_nonnull_mileage", last("total_mileage", ignorenulls=True).over(windowSpecLast))
df = df.withColumn("first_nonnull_mileage", first("diff_mile", ignorenulls=True).over(windowSpecFirst))
df = df.withColumn("first_mileage", lead("total_mileage").over(windowSpecNormal))
df.show()
df = df.withColumn("start_col", F.when(
((F.col("total_mileage").isNotNull()) & (F.col("first_mileage").isNull())), 1).otherwise(0))
windowSpec = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
df = df.withColumn("groupId", F.sum("start_col").over(windowSpec))
df_nulls_grouped = df.groupby("groupId", "user_id").agg(F.count(F.when(F.col("miles").isNull(), 1)).alias("nulls_in_miles"))
# Display the result
df_nulls_grouped.show()
df_new = df.join(df_nulls_grouped, on=["user_id", "groupId"], how="inner")
df_new.orderBy("timestamp").show()
windowSpecRankOverNulls = Window.partitionBy("user_id", "groupId").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
df_ranked = df_new.withColumn("ranked", rank().over(windowSpecRankOverNulls))
df_ranked.show()
df_ranked = df_ranked.withColumn("miles_inter", (col("first_nonnull_mileage") - col("last_nonnull_mileage")) / col("nulls_in_miles"))
df_ranked = df_ranked.withColumn("total_mileage_inter", col("last_nonnull_mileage") + (col("miles_inter") * ( col("ranked") - 1)))
df_ranked.show()
df_final = df_ranked.withColumn("miles_final", when(col("miles").isNotNull(), col("miles")).otherwise(col("miles_inter")))
df_final = df_final.withColumn("total_mileage_final", when(col("total_mileage").isNotNull(), col("total_mileage")).otherwise(col("total_mileage_inter")))
df_final.show()
df_final = df_final.select("user_id", "timestamp", "miles_final", "total_mileage_final", "was_missing")
df_final.show()
最终输出:
+-------+----------------+-----------+-------------------+-----------+
|user_id| timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
| 1|2023-01-01 00:00| 3.0| 10.0| 0|
| 1|2023-01-01 01:00| 2.0| 12.0| 1|
| 1|2023-01-01 02:00| 2.0| 14.0| 1|
| 1|2023-01-01 03:00| 2.0| 16.0| 1|
| 1|2023-01-01 04:00| 4.0| 20.0| 0|
| 1|2023-01-01 05:00| 2.0| 22.0| 1|
| 1|2023-01-01 06:00| 2.0| 24.0| 1|
| 1|2023-01-01 07:00| 2.0| 26.0| 1|
| 1|2023-01-01 08:00| 4.0| 30.0| 0|
+-------+----------------+-----------+-------------------+-----------+
总产量:
+-------+----------------+-----+-------------+-----------+
|user_id| timestamp|miles|total_mileage|was_missing|
+-------+----------------+-----+-------------+-----------+
| 1|2023-01-01 00:00| 3| 10| 0|
| 1|2023-01-01 01:00| NULL| NULL| 1|
| 1|2023-01-01 02:00| NULL| NULL| 1|
| 1|2023-01-01 03:00| NULL| NULL| 1|
| 1|2023-01-01 04:00| 4| 20| 0|
| 1|2023-01-01 05:00| NULL| NULL| 1|
| 1|2023-01-01 06:00| NULL| NULL| 1|
| 1|2023-01-01 07:00| NULL| NULL| 1|
| 1|2023-01-01 08:00| 4| 30| 0|
+-------+----------------+-----+-------------+-----------+
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
|user_id| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL|
| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL|
| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL|
| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20|
| 1|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL|
| 1|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL|
| 1|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL|
| 1|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30|
| 1|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
+-------+-------+--------------+
|groupId|user_id|nulls_in_miles|
+-------+-------+--------------+
| 1| 1| 3|
| 2| 1| 3|
| 3| 1| 0|
+-------+-------+--------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter|miles_final|total_mileage_final|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0| 3.0| 10.0|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0| 2.0| 12.0|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0| 2.0| 14.0|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0| 2.0| 16.0|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0| 4.0| 20.0|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0| 2.0| 22.0|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0| 2.0| 24.0|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0| 2.0| 26.0|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL| 4.0| 30.0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
+-------+----------------+-----------+-------------------+-----------+
|user_id| timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
| 1|2023-01-01 00:00| 3.0| 10.0| 0|
| 1|2023-01-01 01:00| 2.0| 12.0| 1|
| 1|2023-01-01 02:00| 2.0| 14.0| 1|
| 1|2023-01-01 03:00| 2.0| 16.0| 1|
| 1|2023-01-01 04:00| 4.0| 20.0| 0|
| 1|2023-01-01 05:00| 2.0| 22.0| 1|
| 1|2023-01-01 06:00| 2.0| 24.0| 1|
| 1|2023-01-01 07:00| 2.0| 26.0| 1|
| 1|2023-01-01 08:00| 4.0| 30.0| 0|
+-------+----------------+-----------+-------------------+-----------+