在 Spark 中,在从 hdfs 读取文件时,对于每条记录,我想向 df 添加一列,其中包含读取记录的文件的文件创建时间戳。
例如 hdfs具有以下结构
/data/module/
|----------- file1.parquet
|----------- file2.parquet
|----------- file3.parquet
|----------- file4.parquet
在 Spark 中读取此目录时,我想为每条记录添加一列,该列应包含读取记录的文件的创建时间戳。
我尝试使用
df.withColumn("records_inserted_time", current_timestmap())
但这给出了所需的结果。
可以通过文件名获取文件创建时间
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, udf
from pyspark.sql.types import TimestampType
from datetime import datetime
import pyarrow.fs as fs
# Create Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Function to get file creation time
def get_file_creation_time(file_path):
hdfs = fs.HadoopFileSystem() # Create HadoopFileSystem instance
file_info = hdfs.get_file_info(file_path)
return datetime.utcfromtimestamp(file_info.mtime / 1000.0) # Convert milliseconds to seconds and return UTC time
# Register UDF
get_file_creation_time_udf = udf(get_file_creation_time, TimestampType())
# Read files from HDFS
df = spark.read.parquet("hdfs:///data/module/") # File path starts with hdfs://
# Add a new column with the file creation timestamp
df_with_timestamp = df.withColumn("file_created_timestamp", get_file_creation_time_udf(input_file_name()))
# Display the resulting DataFrame
df_with_timestamp.show(truncate=False)