在 DeltaTables 中,为什么带有 mergeSchema 的追加模式会创建存储中数据的完整副本?

问题描述 投票:0回答:1

我正在使用突触笔记本和 pyspark,并尝试以有效的方式支持模式演化。该格式不一定是 DeltaTables,但它似乎是增量数据加载的自然选择。

这是我正在使用的代码:

                new_delta = spark.read.format('delta').load(delta_table_path)
                new_delta.withColumn('test1', lit(None).cast('string')) \
                    .withColumn('test2', lit(None).cast('string')) \
                    .write.format('delta') \
                    .mode('append').option('mergeSchema', 'true') \
                    .save(delta_table_path)

结果是:

查看文件,旧文件具有旧架构,新文件具有新架构。

我的期望是使用追加模式和 mergeSchema 只会更新架构,而不会创建数据的完整历史记录。

运行真空将清理旧数据,但我想避免(如果可能)不必要地将大量数据写入存储。

将覆盖模式与 overwriteSchema 结合使用具有与上述相同的行为。

我是否误解了 DeltaTables 的工作原理?如果我想支持模式演化,我应该使用不同的方法吗?

pyspark parquet azure-synapse delta-lake
1个回答
0
投票

是的,您没有正确执行 - 您读取了所有数据,添加了列,然后以追加模式将所有数据写回,创建了数据的副本。

添加新列有不同的方法:

  • 您可以覆盖现有数据,而不是在

    append
    模式下写入数据。但这仍然会重复存储中的文件

  • 根据您使用的 Delta 表的版本,您可以使用内置函数来添加新列 - 您需要为此使用 SQL 语法(请参阅docs):

# Let's generate some data
>>> spark.range(10).write.format("delta").mode("append").saveAsTable("test")
>>> df = spark.read.table("test")
>>> df.printSchema()
root
 |-- id: long (nullable = true)
# Update table
>>> spark.sql("ALTER TABLE test ADD COLUMNS (col1 int, col2 string)")
DataFrame[]
# Check that schema changed and we see new columns
>>> df = spark.read.table("test")
>>> df.printSchema()
root
 |-- id: long (nullable = true)
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
>>> df.show(2)
+---+----+----+
| id|col1|col2|
+---+----+----+
|  3|null|null|
|  4|null|null|
+---+----+----+
  • 或者您可以简单地修改代码以添加新列,当您下次写入数据时,使用
    mergeSchema
    设置为
    true
    ,架构将被更新。 - 所谓的自动模式演化

我个人推荐使用第二种方法。

附注还可以重命名和删除列 - 您需要为此启用列映射。

© www.soinside.com 2019 - 2024. All rights reserved.