我有具有不断发展的模式的镶木地板文件,我需要将它们全部加载到单个增量表中。我的目标是使用 Autoloader 和
schemaEvolutionMode
="rescue
" (因此源中与目标模式不对齐的所有字段都应落入“_rescued_data
”列)。我还为自动加载器提供 .schema(target_schema)
。
但是当我读取某些文件时,我收到此错误:
Invalid Spark read type: expected optional group my_column (LIST)
{ repeated group list { optional binary element (STRING); } }
to be list but found Some(StringType)
my_column
目标表中的数据类型为 String。
那么为什么它没有加载到
_rescued_data
列并引发错误?
我正在使用的代码:
read_options = {
"cloudFiles.format": "parquet",
"cloudFiles.schemaLocation: "some location",
"cloudFiles.schemaEvolutionMode": "rescue"
}
spark.readStream.format("cloudFiles")
.options(**read_options)
.schema(target_schema)
.load("source_path")
.foreachBatch(<save function>)
.outputMode("append")
.trigger("availableNow", True)
.start()
Databricks 版本是 13.2(Spark 3.4.0、Scala 2.12)
您收到的错误消息表明 Parquet 文件中的列与预期架构不匹配,并且它没有按预期落入“_rescued_data”列。
例如,在 Spark Streaming 作业中使用 Delta Lake 和 Autoloader 时,我使用了具有模式演化的 CSV 文件。
以下代码执行具有模式推断和模式演化模式的自动加载器“
rescue
”
在您不想停止直播的场景中。 新列必须被拯救列,然后应用转换
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("cloudFiles.schemaLocation", schema_loc)
.load(source_location)
)
query = (df.writeStream
.format("delta")
.option("checkpointLocation", checkpoints_loc)
.outputMode("overwrite")
.option("mergeSchema", "true")
.start(target_location)
)
启用模式推断
.option("cloudFiles.inferColumnTypes", "true")
模式演化模式
.option("cloudFiles.schemaEvolutionMode", "rescue")
模式演变
.option("mergeSchema", "true")
在您不想停止直播的场景中。 新的cols必须拯救col并随后应用转型
SparkSession.builder.appName("DeltaWithAutomaticRetries").config("spark.databricks.delta.schema.autoMerge.enabled", "true").getOrCreate()
上述代码将启用 Delta Lake 中模式演化的自动重试
了解更多自动加载器架构演变如何工作?