我正在使用突触笔记本和 pyspark,并尝试以有效的方式支持模式演化。该格式不一定是 DeltaTables,但它似乎是增量数据加载的自然选择。
这是我正在使用的代码:
new_delta = spark.read.format('delta').load(delta_table_path)
new_delta.withColumn('test1', lit(None).cast('string')) \
.withColumn('test2', lit(None).cast('string')) \
.write.format('delta') \
.mode('append').option('mergeSchema', 'true') \
.save(delta_table_path)
查看文件,旧文件具有旧架构,新文件具有新架构。
我的期望是使用追加模式和 mergeSchema 只会更新架构,而不会创建数据的完整历史记录。
运行真空将清理旧数据,但我想避免(如果可能)不必要地将大量数据写入存储。
将覆盖模式与 overwriteSchema 结合使用具有与上述相同的行为。
我是否误解了 DeltaTables 的工作原理?如果我想支持模式演化,我应该使用不同的方法吗?
是的,您没有正确执行 - 您读取了所有数据,添加了列,然后以追加模式将所有数据写回,创建了数据的副本。
添加新列有不同的方法:
您可以覆盖现有数据,而不是在
append
模式下写入数据。但这仍然会重复存储中的文件
根据您使用的 Delta 表的版本,您可以使用内置函数来添加新列 - 您需要为此使用 SQL 语法(请参阅docs):
# Let's generate some data
>>> spark.range(10).write.format("delta").mode("append").saveAsTable("test")
>>> df = spark.read.table("test")
>>> df.printSchema()
root
|-- id: long (nullable = true)
# Update table
>>> spark.sql("ALTER TABLE test ADD COLUMNS (col1 int, col2 string)")
DataFrame[]
# Check that schema changed and we see new columns
>>> df = spark.read.table("test")
>>> df.printSchema()
root
|-- id: long (nullable = true)
|-- col1: integer (nullable = true)
|-- col2: string (nullable = true)
>>> df.show(2)
+---+----+----+
| id|col1|col2|
+---+----+----+
| 3|null|null|
| 4|null|null|
+---+----+----+
mergeSchema
设置为 true
,架构将被更新。 - 所谓的自动模式演化。我个人推荐使用第二种方法。
附注还可以重命名和删除列 - 您需要为此启用列映射。