Spark 基于多个分区(即 DATE_KEY 和 BASE_FEED)读取镶木地板文件

问题描述 投票:0回答:1

我正在使用 PySpark 从按 DATE_KEY 分区的 HDFS 位置读取镶木地板文件。以下代码始终从 MAX(DATE_KEY) 分区读取文件并转换为 Polars 数据帧。

def hdfs_fetch_latest_parquet_file(parquet_file_name):
    sc = SparkSession.builder.appName("hdfs-spark").config("spark.sql.execution.arrow.pyspark.enabled", "true").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled", "true").config("spark.memory.offHeap.size", "16g").config("spark.driver.maxResultSize", "4g").config("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED").config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED").config("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED").config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED").getOrCreate()   
    spark_df = sc.read.parquet(config.HDFS_DEV_URL + config.PARQUET_FILE_REFINED_PATH + parquet_file_name + "/").createOrReplaceTempView("allDateKeysParquet")
    spark_df_latest_date_key = sc.sql('select * from allDateKeysParquet where DATE_KEY = (select MAX(DATE_KEY) from allDateKeysParquet)')
    df = pl.from_arrow(pa.Table.from_batches(spark_df_latest_date_key._collect_as_arrow())).drop('DATE_KEY')
    return df

如果其中没有任何子分区,则此方法可以很好地找到最大 DATE_KEY,即: enter image description here

但挑战是,如果 DATE_KEY 进一步具有基于 BASE_FEED 的子分区,则代码会失败。

enter image description here

我的目标是读取 MAX(DATE_KEY) 内的镶木地板文件,如果其中包含子文件夹,则也读取其中的所有内容。

我尝试使用下面的代码,但它有例外

spark_df = sc.read.parquet(config.HDFS_DEV_URL + config.PARQUET_FILE_REFINED_PATH + parquet_file_name + "/*").createOrReplaceTempView("allDateKeysParquet")

检测到冲突的目录结构。可疑路径

有没有其他方法可以解决这个问题,以便代码始终找到 MAX(DATE_KEY) 分区并读取其中的所有镶木地板文件,而不管 date_key 包含更多分区(即本例中的 BASE_FEED)?

有人可以帮我吗?

apache-spark pyspark hdfs python-polars pyarrow
1个回答
0
投票

“/*”位于镶木地板的路径上,您应该只访问目录而不是单个文件,在本例中为 REFERENCE_DATA.parquet

© www.soinside.com 2019 - 2024. All rights reserved.