这是一个pyspark重叠时间段问题:
data = [
(1, "2024-01-28T05:00:00Z", "2024-01-28T06:00:00Z", "1/24/24"),
(1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
(1, "2024-01-28T06:00:00Z", "2024-01-28T09:00:00Z", "1/24/24"),
(1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
(3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]
columns = ["station_id", "start_time", "end_time", "partition_date"]
我正在尝试根据 start_time 和 end_time 字段识别相同 station_id 的重叠记录,一旦确定,我只想保留具有最新分区日期的行并删除具有旧分区日期的重叠行。
预期的输出是:
output = [
(1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
(1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
(3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]
我尝试了几种方法来做到这一点,从进行连接到进行窗口化,但无论哪种方式,我最终都没有达到预期的结果,即:识别重叠行,并且仅保留最近的重叠行,同时删除其余行,以及还保留所有不重叠的行。目标是仅对 station_id 的特定持续时间进行一次计时,因此例如 5:00 到 6:00 对于一个 station_id 应该只有一条记录。
我尝试过的一个解决方案:
dl_ota_airings_df_dup = dl_ota_airings_df_3.selectExpr("station_id as station_id2", "start_time as start_time2", "end_time as end_time2", "content_id as content_id2", "partition_date as partition_date2")
join_condition = ((dl_ota_airings_df_3["station_id"] == dl_ota_airings_df_dup["station_id2"]) &
((dl_ota_airings_df_3["start_time"] >= dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["start_time"] < dl_ota_airings_df_dup["end_time2"]))
|
((dl_ota_airings_df_3["end_time"] <= dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["end_time"] > dl_ota_airings_df_dup["start_time2"])))
# &
# ((dl_ota_airings_df_3["start_time"] != dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["end_time"] != dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["partition_date"] != dl_ota_airings_df_dup["partition_date2"])))
df_overlapping = dl_ota_airings_df_3.join(dl_ota_airings_df_dup, join_condition, "left")
dl_ota_airings_df_4 = df_overlapping.filter("station_id2 is null or (partition_date > partition_date2)").drop("station_id2", "start_time2", "end_time2", "content_id2", "partition_date2").dropDuplicates()
我总是有一些边缘情况没有被这个逻辑捕获,因为对于任何 station_id 的任何可用重叠,我只想保留最新分区的一条记录,同时删除其余的记录。请给我建议,或者给我指出正确的方向。
您可以尝试下面的代码来实现这一点
windowSpec = Window.partitionBy("station_id").orderBy("start_time")
result_df = df.withColumn("prev_end_time", F.lag("end_time").over(windowSpec)) \
.withColumn("overlap", F.when(F.col("start_time") < F.col("prev_end_time"), True).otherwise(False)) \
.withColumn("max_partition_date", F.max("partition_date").over(Window.partitionBy("station_id"))) \
.filter((~F.col("overlap")) | (F.col("partition_date") == F.col("max_partition_date"))) \
.select("station_id", "start_time", "end_time", "partition_date")