我有一个由
year
、month
、day
分区的增量表。分区列设置为字符串。
我需要从 Delta 表中读取过去 7 天的数据。例如,如果作业在周二运行,它应该检索上周一到周日的数据,总共 7 天。
from datetime import datetime,timedelta
from pyspark.sql.functions import *
trigger_date = "2024-05-07" # yyyy-mm-dd
trigger_date_obj = datetime.strptime(trigger_date, '%Y-%m-%d')
start_date_obj = trigger_date_obj - timedelta(days=8)
end_date_obj = trigger_date_obj - timedelta(days=2)
# Extract week, start_day and end_day
start_day = start_date_obj.day
start_month = start_date_obj.month
start_year = start_date_obj.year
end_day = end_date_obj.day
end_month = end_date_obj.month
end_year = end_date_obj.year
condition = ((col("year").between(start_year, end_year)) &
(col("month").between(start_month, end_month)) &
(col("day").between(start_day, end_day))
)
data = spark.read.format("delta").table("delta_table_name")
data = data.filter(condition)
当日期在同一月/一年内时(例如,如果触发日期是“2024-05-09”),此方法可以正常工作。但是,如果触发日期为“2024-05-07”,则无法产生所需的结果(不返回任何数据),因为开始日期 (2024-4-29) 和结束日期 (2024-5-5) 跨越不同的日期几个月。
如果可能的话,我们怎样才能有一个适用于所有场景的通用过滤条件?
我认为最简单的解决方案是从月、年和日列创建一个新的
DateType
列,然后根据该列进行过滤。
data.withColumn(
"date",
concat_ws("-",col("year"),col("month"),col("day")).cast("date")
).filter(
col("date").between(start_date_obj, end_date_obj)
)