Pypark 附加分区会覆盖未分区的镶木地板

问题描述 投票:0回答:1

在 Azure Databricks 中,当我有一个未按某些列分区的镶木地板文件时。随后用partitionBy("some_column")追加一个新的数据帧,我原来的“未分区”数据帧的数据被覆盖。为什么会出现这种情况?追加不应该重新分区我的初始数据吗?或者至少发出数据被覆盖的警告?

## Example 1, only data from df2 is in file_path
# Init dataframe
df1.write.mode("overwrite").parquet(file_path)
# Append with partitionBy 
df2.write.mode("append").partitionBy("column1").parquet(file_path)

## Example 2, correctly appended data frome both frames is in file_path
# Init dataframe
df1.write.mode("overwrite").partitionBy("column1").parquet(file_path)
# Append with partitionBy 
df2.write.mode("append").partitionBy("column1").parquet(file_path)
pyspark databricks parquet
1个回答
0
投票

问题

出现示例 1 中观察到的行为是因为将具有不同分区的数据帧附加到现有 Parquet 文件不会触发原始数据的重新分区。相反,会在原始目录下创建新目录来存储分区。当 Spark 在写入第二个数据帧后读取 Parquet 时,您的原始元数据会丢失。

解决方案

当您尝试使用不同的分区进行写入时,其他一些实现(例如增量表)会引发错误。

此示例通过一些模拟数据展示了您的用例:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Create data
data1 = [(1, "Alice"),
        (2, "Bob"),
        (3, "Charlie")]

data2 = [(1, "Alice"),
        (2, "Bob"),
        (3, "Charlie"),
        (4, "Joe")]

# Create DataFrame
df1 = spark.createDataFrame(data1, schema)
df2 = spark.createDataFrame(data2, schema)

df1.write.mode("overwrite").format('delta').save('dbfs:/FileStore/testdelta')
# Append with partitionBy 
df2.write.mode("append").partitionBy("name").format('delta').save('dbfs:/FileStore/testdelta')

display(spark.read.format('delta').load('dbfs:/FileStore/testdelta'))

代码应引发异常,如下所示:

AnalysisException: Partition columns do not match the partition columns of the table.
Given: [`name`]
Table: []

防止您的数据丢失。

额外球

本文深入探讨内部 Parquet 文件结构和 Spark 阅读器:

© www.soinside.com 2019 - 2024. All rights reserved.