使用文件名创建变量-PySpark

问题描述 投票:0回答:2

我有一个文件夹(每天,每周),文件被丢弃,我需要以相同的格式在文件名中添加年和周/日,作为数据框的变量。前缀可以更改(例如sales_reportcash_flow等),但最后一个字符始终为YYYY_WW.csv

例如,对于每周一次的文件,我可以手动为每个文件执行以下操作:

from pyspark.sql.functions import lit

df = spark.read.load('my_folder/sales_report_2019_12.csv', format="csv").withColumn("sales_year", lit(2019)).withColumn("sales_week", lit(12))

我想做一个等效的事情,即使用从文件名右边开始计数的子字符串函数来解析122019。我能够解析这些变量的文件名,然后可以使用通配符(例如df = spark.read.load('my_folder/sales_report_*.csv', format="csv"))读取文件夹中的所有文件,这将大大简化我的代码。

pyspark pyspark-sql pyspark-dataframes
2个回答
1
投票

您可以使用input_file_name()列和某些字符串函数,例如regexp_extractsubstring_index,从文件名中轻松提取它:

df = spark.read.load('my_folder/*.csv', format="csv")

df = df.withColumn("year_week", regexp_extract(input_file_name(), "\d{4}_\d{1,2}"))\
       .withColumn("sales_year", substring_index(col("year_week"), "_", 1))\
       .withColumn("sales_week", substring_index(col("year_week"), "_", -1))\
       .drop("year_week")

1
投票

您可以尝试以下方法:

import glob
listfiles = glob.glob('my_folder/sales_report_*.csv')
for files in listfiles:
    weekyear = c.split('_',2)[-1].split('_')
    week = weekyear[1]
    year = weekyear[0]
    df = spark.read.load('files', format="csv").withColumn("sales_year", lit(year)).withColumn("sales_week", lit(week))
© www.soinside.com 2019 - 2024. All rights reserved.