根据时间戳识别重叠记录并删除 pyspark 中的旧重叠

问题描述 投票:0回答:1

这是一个pyspark重叠时间段问题:

样本数据

data = [
    (1, "2024-01-28T05:00:00Z", "2024-01-28T06:00:00Z", "1/24/24"),
    (1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
    (1, "2024-01-28T06:00:00Z", "2024-01-28T09:00:00Z", "1/24/24"),
    (1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
    (3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]

columns = ["station_id", "start_time", "end_time", "partition_date"]

我正在尝试根据 start_time 和 end_time 字段识别相同 station_id 的重叠记录,一旦确定,我只想保留具有最新分区日期的行并删除具有旧分区日期的重叠行。

预期的输出是:

output = [
    (1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
    (1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
    (3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]

我尝试了几种方法来做到这一点,从进行连接到进行窗口化,但无论哪种方式,我最终都没有达到预期的结果,即:识别重叠行,并且仅保留最近的重叠行,同时删除其余行,以及还保留所有不重叠的行。目标是仅对 station_id 的特定持续时间进行一次计时,因此例如 5:00 到 6:00 对于一个 station_id 应该只有一条记录。

我尝试过的一个解决方案:

dl_ota_airings_df_dup = dl_ota_airings_df_3.selectExpr("station_id as station_id2", "start_time as start_time2", "end_time as end_time2", "content_id as content_id2", "partition_date as partition_date2")

join_condition = ((dl_ota_airings_df_3["station_id"] == dl_ota_airings_df_dup["station_id2"]) &
    ((dl_ota_airings_df_3["start_time"] >= dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["start_time"] < dl_ota_airings_df_dup["end_time2"]))
    |
    ((dl_ota_airings_df_3["end_time"] <= dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["end_time"] > dl_ota_airings_df_dup["start_time2"])))
    # &
    # ((dl_ota_airings_df_3["start_time"] != dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["end_time"] != dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["partition_date"] != dl_ota_airings_df_dup["partition_date2"])))

df_overlapping = dl_ota_airings_df_3.join(dl_ota_airings_df_dup, join_condition, "left")

dl_ota_airings_df_4 = df_overlapping.filter("station_id2 is null or (partition_date > partition_date2)").drop("station_id2", "start_time2", "end_time2", "content_id2", "partition_date2").dropDuplicates()

我总是有一些边缘情况没有被这个逻辑捕获,因为对于任何 station_id 的任何可用重叠,我只想保留最新分区的一条记录,同时删除其余的记录。请给我建议,或者给我指出正确的方向。

python join pyspark databricks overlapping
1个回答
0
投票

您可以尝试下面的代码来实现这一点

windowSpec = Window.partitionBy("station_id").orderBy("start_time")

result_df = df.withColumn("prev_end_time", F.lag("end_time").over(windowSpec)) \
    .withColumn("overlap", F.when(F.col("start_time") < F.col("prev_end_time"), True).otherwise(False)) \
    .withColumn("max_partition_date", F.max("partition_date").over(Window.partitionBy("station_id"))) \
    .filter((~F.col("overlap")) | (F.col("partition_date") == F.col("max_partition_date"))) \
    .select("station_id", "start_time", "end_time", "partition_date")
© www.soinside.com 2019 - 2024. All rights reserved.