我在 Synapse Notebook 环境中实现 Spark DataFrame 分区覆盖时遇到了问题。尽管成功从分区中删除指定的帐号,但如果分区因此变空,它不会被覆盖。
# Set dynamic partition overwrite mode
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
# Define partitioned path
partitioned_path = "<partitioned_path>"
# Define partition columns
partition_columns = ["region", "partition_key"]
# Step 2: Create DataFrame with more data and partitions
data = [
("123", 10, "region1", "partition1"),
("456", 20, "region2", "partition2"),
("789", 30, "region1", "partition1"),
("234", 15, "region1", "partition1"),
("567", 25, "region2", "partition2"),
("890", 35, "region3", "partition3")
]
df = spark.createDataFrame(data, ["acc_no", "value", "region", "partition_key"])
# Step 3: Write DataFrame to Partitioned Path
df.write.partitionBy(*partition_columns).mode("overwrite").parquet(partitioned_path)
# Step 4: Read Partitioned Data and Filter
acc_nos_to_remove = ["456", "567", "123"]
try:
# Read partitioned data
partitioned_df = spark.read.option("basePath", partitioned_path).parquet(partitioned_path)
# Filter DataFrame to remove specified account numbers
filtered_df = partitioned_df.filter(~partitioned_df.acc_no.isin(acc_nos_to_remove))
# Step 5: Update Partition
filtered_df.write.format('parquet').option('header',True).mode('overwrite').option("maxPartitionBytes", 128*1024*1024).save(partitioned_path)
print("Partition data updated successfully.")
except Exception as e:
print(f"Error occurred while reading or writing Parquet files : {e}")
注意:它在 Databrics 中工作正常,但在突触中则不然。
我预计当从分区中删除即帐号时,如果任何分区变空,它将被更新的数据覆盖。但是,在 Synapse Notebook 中,空分区没有按预期被覆盖。相反,他们似乎坚持以前的数据。
任何有关解决此问题的帮助或见解将不胜感激。
你可以测试一下,我在我的代码中使用了它,效果很好:
from pyspark.sql import DataFrame
def delete_empty_partitions(df: DataFrame, partitioned_path: str, partition_columns: list):
active_partitions = df.select(partition_columns).distinct().collect()
active_partition_paths = [f"{partitioned_path}/" + "/".join([f"{col}={row[col]}" for col in partition_columns]) for row in active_partitions]
all_partition_paths = dbutils.fs.ls(partitioned_path) # or appropriate filesystem command
all_partition_paths = [path.path for path in all_partition_paths if path.isDir()]
empty_partitions = set(all_partition_paths) - set(active_partition_paths)
for partition in empty_partitions:
dbutils.fs.rm(partition, recurse=True)
delete_empty_partitions(filtered_df, partitioned_path, partition_columns)