我正在尝试使用以下指令访问 _metadata 以获取文件修改时间: https://docs.databricks.com/en/ingestion/file-metadata-column.html
这是我的代码:
df = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='TAG2') \
.options(nullValue='') \
.load(xmlFile) \
.select("*", "_metadata")
这在我加载 csv 文件时有效,但不适用于 XML 文件。我收到错误消息,指出没有这样的列。
我确信加载 XML 内容的代码运行良好。
XML 文件不支持此功能还是我做错了什么?
由于
com.databricks.spark.xml
类可能不支持添加_metadata
列,并且确认兴趣的值是文件的file_modification_time
,因此可以使用dbutils.fs.ls("<path to the folder containing the file>")
。[FileInfo(path='dbfs:/tmp/my_file.txt', name='my_file.txt', size=40, modificationTime=1622054945000)]
modificationTime
采用unix时间戳格式。file_modification_time
转换添加 from_unixtime
:
import pyspark.sql.functions as f
file_list = dbutils.fs.ls("<path to the folder containing the file>")
file_modification_unix_time = file_list[0].modificationTime
df = df.withColumn('file_modification_time', f.from_unixtime(f.lit(file_modification_unix_time)))
如果目录中有多个文件,您可以从
dbutils.fs.ls
的输出创建一个数据帧(我们称之为 file_list_df
,使用 df
转换为 input_file_name()
的每个记录添加文件路径,然后加入 df
与 file_list_df
:
import pyspark.sql.functions as f
file_list = dbutils.fs.ls("<path to the folder containing the file>")
file_list_df = (
spark.createDataFrame([
(file_elemen.path, file_element.modificationTime) for file_element in file_list
], ['filePath', 'modificationTime'])
.withColumn('file_modification_time', f.from_unixtime(f.col('modificationTime')))
)
df = (
df
.withColumn('filePath', f.input_file_name())
.join(file_list_df, ['filePath'])
.select(df['*'], file_list_df['file_modification_time'])
)