如何使用 PySpark 覆盖同一位置的 Parquet 文件

问题描述 投票:0回答:1

我正在 Synapse 笔记本中使用 PySpark,需要将 Parquet 文件加载到 DataFrame 中,应用一些转换(例如重命名列),然后将修改后的 DataFrame 保存回同一位置,覆盖原始文件。但是,当我尝试保存 DataFrame 时,它会创建一个名为 Test.parquet 的目录,其中包含两个文件:一个名为 SUCCESS,另一个包含一串随机字母和数字。

这是我正在使用的代码:

%%pyspark
df = spark.read.load('path/to/Test.parquet', format='parquet')
display(df.limit(10))

column_mapping = {
    "FullName": "Full Name",
}

for old_col, new_col in column_mapping.items():
    df = df.withColumnRenamed(old_col, new_col)
    display(df.limit(10))
df.write.parquet('path/to/Test.parquet', mode='overwrite')

以下是如何覆盖文件:

enter image description here

如何正确覆盖原始 Parquet 文件而不创建额外的文件或目录?如有任何帮助,我们将不胜感激!

pyspark azure-synapse azure-synapse-analytics azure-notebooks
1个回答
0
投票

spark 仅保存单个 parquet 的问题在于其出于并发目的对数据表进行分区的本质。文件还通过 snappy 算法进行压缩。我尝试根据 databricks 文件系统提供答案。也许您需要根据您的需要调整文件路径。

你需要做的是:

  1. 通过
    coalsce(1)
    合并 DataFrame,以在
    df.write
    命令创建的文件夹中实现一个文件
  2. 在spark制作的文件夹中通过
    os.listdir
    操作识别单个parquet文件
  3. 将其替换为您最初读取的目录
    os.replace
  4. 删除由spark创建的文件夹及其所有包含子文件的
    shutil.rmtree

我为您提供了有关如何实现您想要的目标的答案。


from pyspark.sql import DataFrame
import os
import shutil
from typing import Union
from pathlib import Path


def get_local_path(path: Union[str, Path]) -> str:
    """
    Transforms a potential dbfs path to a
    path accessible by standard file system operations

    :param path: Path to transform to local path
    :return: The local path
    """
    return str(path).replace("dbfs:", "/dbfs")

def save_as_one_parquet_file(
    df: DataFrame,
    output_file_path: str,
):
    """
    Saves a spark dataframe as a single parquet file.

    :param df: The spark dataframe to write.
    :param output_file_path: The output filepath for writing the parquet file.
    """

    localpath = get_local_path(output_file_path)
    tmp_file_path = localpath + "_temp"
    (
        df.coalesce(1)
        .write.mode("overwrite")
        .format("parquet")
        .save(output_file_path + "_temp")
    )

    file = [file for file in os.listdir(localpath + "_temp") if file.endswith(".parquet")][0]
    os.replace(os.path.join(localpath + "_temp", file), localpath)
    shutil.rmtree(tmp_file_path)

# Reading in
path = "dbfs:/mnt/dl2-temp-p-chn-1/test/Flights 1m.parquet"
df = spark.read.parquet(path)

# Transformations
df_new = df.withColumn("blub", f.lit(2))

# Saving as one parquet file
save_as_one_parquet_file(df_new, path)

© www.soinside.com 2019 - 2024. All rights reserved.