pyspark 中是否有“执行日期”或运行日期参数?

问题描述 投票:0回答:1

我想为历史数据运行脚本(2022 data_ 以日期目录格式 yyyy/mm/dd 存储在 azure data lake gen 2 中)

步骤:1

  1. 对于 2022 年的每个日期,我想提取过去 10 天左右的数据

  2. 对其进行一些计算,即根据最近 10 天计算汇率

  3. 在单独的文件夹中再次将此数据帧写入数据湖,例如 changed/yyyy/mm/dd

pyspark 中是否存在执行日期参数?

currently for (present) 时间 e.g.今天的日期 27/02/2023 我将提取过去 10 天(2 月 27 日至 2 月 17 日)的数据,计算比率,然后发送到文件夹 2023/02/27。为此,我设置了两个变量 startDate 和 endDate ,使用 datetime,now() 函数作为开始日期,使用 end_date 作为 timedelta(10) 。我需要能够完成历史数据的整个过程。

apache-spark date pyspark spark-streaming azure-databricks
1个回答
0
投票

您可以使用以下代码作为模板来实现需求。

from datetime import datetime,timedelta
import pandas as pd

last = datetime.strptime('2023-03-01', '%Y-%m-%d')        #the final day until which you want to read data
first = datetime.strptime('2023-01-01', '%Y-%m-%d')     #the day from which you want to start reading data from
current_date = last
while (current_date >= first):   #performing operations for each day (using 10 previoud days from current date) until first day i.e., 2023-01-01
    end_date = current_date- timedelta(days=10)    #to read files between these 10 days
    file_paths = [f"/mnt/container/{single_date.strftime('%Y/%m/%d')}/*.parquet" for single_date in pd.date_range(end_date, current_date, freq='D')]

    print(file_paths)
    #read files here
    #perform operations here
    
    current_date = current_date - timedelta(1)

enter image description here



  • current_date
    将是您在对数据框执行计算/操作后将数据写入的日期文件夹。
© www.soinside.com 2019 - 2024. All rights reserved.