从增量表中过滤开始日和结束日的数据

问题描述 投票:0回答:1

我有一个由

year
month
day
分区的增量表。分区列设置为字符串。

我需要从 Delta 表中读取过去 7 天的数据。例如,如果作业在周二运行,它应该检索上周一到周日的数据,总共 7 天。

from datetime import datetime,timedelta
from pyspark.sql.functions import *

trigger_date = "2024-05-07" # yyyy-mm-dd
trigger_date_obj = datetime.strptime(trigger_date, '%Y-%m-%d')
start_date_obj = trigger_date_obj - timedelta(days=8)
end_date_obj = trigger_date_obj - timedelta(days=2)

# Extract week, start_day and end_day
start_day = start_date_obj.day
start_month = start_date_obj.month
start_year = start_date_obj.year

end_day = end_date_obj.day
end_month = end_date_obj.month
end_year = end_date_obj.year


condition = ((col("year").between(start_year, end_year)) & 
             (col("month").between(start_month, end_month)) & 
             (col("day").between(start_day, end_day)) 
            )

data = spark.read.format("delta").table("delta_table_name") 
data = data.filter(condition)

当日期在同一月/一年内时(例如,如果触发日期是“2024-05-09”),此方法可以正常工作。但是,如果触发日期为“2024-05-07”,则无法产生所需的结果(不返回任何数据),因为开始日期 (2024-4-29) 和结束日期 (2024-5-5) 跨越不同的日期几个月。

如果可能的话,我们怎样才能有一个适用于所有场景的通用过滤条件?

scala apache-spark pyspark
1个回答
0
投票

我认为最简单的解决方案是从月、年和日列创建一个新的

DateType
列,然后根据该列进行过滤。

data.withColumn(
    "date",
    concat_ws("-",col("year"),col("month"),col("day")).cast("date")
).filter(
    col("date").between(start_date_obj, end_date_obj)
)
© www.soinside.com 2019 - 2024. All rights reserved.