在spark中如何获取镶木地板文件创建的时间戳作为列

问题描述 投票:0回答:1

在 Spark 中,在从 hdfs 读取文件时,对于每条记录,我想向 df 添加一列,其中包含读取记录的文件的文件创建时间戳。

例如 hdfs具有以下结构

/data/module/
|----------- file1.parquet
|----------- file2.parquet
|----------- file3.parquet
|----------- file4.parquet

在 Spark 中读取此目录时,我想为每条记录添加一列,该列应包含读取记录的文件的创建时间戳。

我尝试使用

df.withColumn("records_inserted_time", current_timestmap())

但这给出了所需的结果。

apache-spark hdfs parquet
1个回答
0
投票

可以通过文件名获取文件创建时间

from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, udf
from pyspark.sql.types import TimestampType
from datetime import datetime
import pyarrow.fs as fs

# Create Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Function to get file creation time
def get_file_creation_time(file_path):
    hdfs = fs.HadoopFileSystem()  # Create HadoopFileSystem instance
    file_info = hdfs.get_file_info(file_path)
    return datetime.utcfromtimestamp(file_info.mtime / 1000.0)  # Convert milliseconds to seconds and return UTC time

# Register UDF
get_file_creation_time_udf = udf(get_file_creation_time, TimestampType())

# Read files from HDFS
df = spark.read.parquet("hdfs:///data/module/")  # File path starts with hdfs://

# Add a new column with the file creation timestamp
df_with_timestamp = df.withColumn("file_created_timestamp", get_file_creation_time_udf(input_file_name()))

# Display the resulting DataFrame
df_with_timestamp.show(truncate=False)

© www.soinside.com 2019 - 2024. All rights reserved.