Synapse Notebook 中的 Spark DataFrame 分区覆盖问题:空分区未覆盖

问题描述 投票:0回答:1

我在 Synapse Notebook 环境中实现 Spark DataFrame 分区覆盖时遇到了问题。尽管成功从分区中删除指定的帐号,但如果分区因此变空,它不会被覆盖。

# Set dynamic partition overwrite mode
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# Define partitioned path
partitioned_path = "<partitioned_path>"

# Define partition columns
partition_columns = ["region", "partition_key"]

# Step 2: Create DataFrame with more data and partitions
data = [
    ("123", 10, "region1", "partition1"),
    ("456", 20, "region2", "partition2"),
    ("789", 30, "region1", "partition1"),
    ("234", 15, "region1", "partition1"),
    ("567", 25, "region2", "partition2"),
    ("890", 35, "region3", "partition3")
]
df = spark.createDataFrame(data, ["acc_no", "value", "region", "partition_key"])

# Step 3: Write DataFrame to Partitioned Path
df.write.partitionBy(*partition_columns).mode("overwrite").parquet(partitioned_path)

# Step 4: Read Partitioned Data and Filter
acc_nos_to_remove = ["456", "567", "123"]


try:
    # Read partitioned data
    partitioned_df = spark.read.option("basePath", partitioned_path).parquet(partitioned_path)

    # Filter DataFrame to remove specified account numbers
    filtered_df = partitioned_df.filter(~partitioned_df.acc_no.isin(acc_nos_to_remove))

    # Step 5: Update Partition
    filtered_df.write.format('parquet').option('header',True).mode('overwrite').option("maxPartitionBytes", 128*1024*1024).save(partitioned_path)
    print("Partition data updated successfully.")

except Exception as e:
    print(f"Error occurred while reading or writing Parquet files : {e}")

注意:它在 Databrics 中工作正常,但在突触中则不然。

我预计当从分区中删除即帐号时,如果任何分区变空,它将被更新的数据覆盖。但是,在 Synapse Notebook 中,空分区没有按预期被覆盖。相反,他们似乎坚持以前的数据。

任何有关解决此问题的帮助或见解将不胜感激。

python-3.x pyspark azure-synapse-pipeline
1个回答
0
投票

你可以测试一下,我在我的代码中使用了它,效果很好:

from pyspark.sql import DataFrame

def delete_empty_partitions(df: DataFrame, partitioned_path: str, partition_columns: list):
    active_partitions = df.select(partition_columns).distinct().collect()
    active_partition_paths = [f"{partitioned_path}/" + "/".join([f"{col}={row[col]}" for col in partition_columns]) for row in active_partitions]
    all_partition_paths = dbutils.fs.ls(partitioned_path)  # or appropriate filesystem command
    all_partition_paths = [path.path for path in all_partition_paths if path.isDir()]
    empty_partitions = set(all_partition_paths) - set(active_partition_paths)
    for partition in empty_partitions:
        dbutils.fs.rm(partition, recurse=True)

delete_empty_partitions(filtered_df, partitioned_path, partition_columns)
© www.soinside.com 2019 - 2024. All rights reserved.