我正在 Synapse 笔记本中使用 PySpark,需要将 Parquet 文件加载到 DataFrame 中,应用一些转换(例如重命名列),然后将修改后的 DataFrame 保存回同一位置,覆盖原始文件。但是,当我尝试保存 DataFrame 时,它会创建一个名为 Test.parquet 的目录,其中包含两个文件:一个名为 SUCCESS,另一个包含一串随机字母和数字。
这是我正在使用的代码:
%%pyspark
df = spark.read.load('path/to/Test.parquet', format='parquet')
display(df.limit(10))
column_mapping = {
"FullName": "Full Name",
}
for old_col, new_col in column_mapping.items():
df = df.withColumnRenamed(old_col, new_col)
display(df.limit(10))
df.write.parquet('path/to/Test.parquet', mode='overwrite')
以下是如何覆盖文件:
如何正确覆盖原始 Parquet 文件而不创建额外的文件或目录?如有任何帮助,我们将不胜感激!
spark 仅保存单个 parquet 的问题在于其出于并发目的对数据表进行分区的本质。文件还通过 snappy 算法进行压缩。我尝试根据 databricks 文件系统提供答案。也许您需要根据您的需要调整文件路径。
你需要做的是:
coalsce(1)
合并 DataFrame,以在 df.write
命令创建的文件夹中实现一个文件os.listdir
操作识别单个parquet文件os.replace
shutil.rmtree
我为您提供了有关如何实现您想要的目标的答案。
from pyspark.sql import DataFrame
import os
import shutil
from typing import Union
from pathlib import Path
def get_local_path(path: Union[str, Path]) -> str:
"""
Transforms a potential dbfs path to a
path accessible by standard file system operations
:param path: Path to transform to local path
:return: The local path
"""
return str(path).replace("dbfs:", "/dbfs")
def save_as_one_parquet_file(
df: DataFrame,
output_file_path: str,
):
"""
Saves a spark dataframe as a single parquet file.
:param df: The spark dataframe to write.
:param output_file_path: The output filepath for writing the parquet file.
"""
localpath = get_local_path(output_file_path)
tmp_file_path = localpath + "_temp"
(
df.coalesce(1)
.write.mode("overwrite")
.format("parquet")
.save(output_file_path + "_temp")
)
file = [file for file in os.listdir(localpath + "_temp") if file.endswith(".parquet")][0]
os.replace(os.path.join(localpath + "_temp", file), localpath)
shutil.rmtree(tmp_file_path)
# Reading in
path = "dbfs:/mnt/dl2-temp-p-chn-1/test/Flights 1m.parquet"
df = spark.read.parquet(path)
# Transformations
df_new = df.withColumn("blub", f.lit(2))
# Saving as one parquet file
save_as_one_parquet_file(df_new, path)