我正在 Azure Synapse Pipeline 环境中工作,我正在尝试初始化一个数组,其中包含日期“03-31”、“06-30”、“09-30”和“12-31”范围从 2018 年至今。因此,目前,这将是“2024-03-31”之前的所有季度日期。我正在构建它来执行休息 API 调用,其中我将传递数组中的每个日期以将报告拉取到我的 ADLS。我已经弄清楚了 ADLS 部分,这只是实际让这个阵列工作的问题。
我尝试了几种解决方案来使其正常工作,但我在突触方面的经验非常有限。我首先尝试创建一个 pyspark 笔记本,该笔记本将在管道开始时运行,并生成一个 .json,其中包含 ADLS 中所有必要的季度日期。我能够让它工作,但是当我将 .json 拉入管道时,我无法将其转换为数组变量。
我暂时决定手动生成季度日期数组。但我不禁期待有一个简单的方法来解决这个问题。看起来应该没那么复杂。
def getDateRange(start_date):
# Define start date and current date
current_date_str = spark.sql("SELECT CAST(current_date() AS STRING)").collect()[0][0]
print("Current Date:")
print(current_date_str)
end_year = int(current_date_str[:4]) + 1
# Generate a sequence of dates within the range up to the current date
date_range_df = spark.range(0, (365 * (end_year - 2018) + 1)).selectExpr("date_add('{}', CAST(id AS INT)) as date".format(start_date))
# Filter out dates beyond the current date
date_range_df = date_range_df.filter(col("date") <= current_date())
# Extract month and day
date_range_df = date_range_df.withColumn("month", expr("month(date)")).withColumn("day", expr("day(date)"))
# Filter dates to keep only the end of each quarter
quarterly_dates_df = date_range_df.filter(
(date_range_df.month == 3) & (date_range_df.day == 31) |
(date_range_df.month == 6) & (date_range_df.day == 30) |
(date_range_df.month == 9) & (date_range_df.day == 30) |
(date_range_df.month == 12) & (date_range_df.day == 31)
)
# Format dates as strings in 'MM-dd-yyyy' format
quarterly_dates_df = quarterly_dates_df.withColumn("date_string", date_format("date", "MM-dd-yyyy"))
return quarterly_dates_df
还有一段额外的代码可以将此数据输入 ADLS 系统,但它包含敏感信息。我没有构建的管道部分可供共享,但我试图进行查找,然后将输出存储到数组中,但该数组存储为单个元素而不是列表。
@json()
会将字符串值转换为Json本身。
我尝试将字符串值
'[1,2,3]
作为 @json()
表达式的输入,它给出了数组值作为输出 [1,2,3]
。
输入:
活动输出:
同样,您可以在 json 函数中给出查找活动的输出,以将查找活动的字符串输出转换为 Json 本身的数组。
示例表达式:
@json(activity('Lookup1').output.value)