我正在尝试将 Databricks Autoloader 用于一个非常简单的用例:
从 S3 读取 JSON 并将其加载到增量表中,并进行模式推断和演化。
这是我的代码:
self.spark \
.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
.load(f"{self.source_s3_bucket}/{source_table_name}") \
.distinct() \
.writeStream \
.trigger(availableNow=True) \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
.option("streamName", source_table_name) \
.start(f"{self.target_s3_bucket}/{target_table_name}")
当带有未知列的 JSON 到达时,流会如预期失败,并出现
NEW_FIELDS_IN_RECORD_WITH_FILE_PATH
异常。
但是当我重试该作业时,出现以下异常:
StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.
这是我第一次使用 Autoloader,我是否做错了什么?
考虑定义显式模式,而不是完全依赖模式推断。这可以帮助更可预测地管理模式演变。如果传入数据的架构发生变化,您可以相应地更新您的显式架构。
错误消息表明状态模式不匹配。这可能是由于运行之间推断的模式存在差异造成的。检查保存在
_schema
位置的 schema 并将其与当前 JSON schema 进行比较。
此外,还为模式演化添加一些错误处理。当检测到新字段时,可以通过记录错误并继续流来优雅地处理它。
定义显式模式将是,例如:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define your schema explicitly
schema = StructType([
StructField("fieldName1", StringType(), True),
StructField("fieldName2", IntegerType(), True),
# Add more fields as needed
])
# Use the defined schema in the readStream
self.spark \
.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schema", schema) # Use the explicit schema
.option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
.load(f"{self.source_s3_bucket}/{source_table_name}") \
.distinct() \
.writeStream \
.trigger(availableNow=True) \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
.option("streamName", source_table_name) \
.start(f"{self.target_s3_bucket}/{target_table_name}")
另一种方法是使用 .option("cloudFiles.schemaHints", "your_hints_here")
提供
模式提示。当您知道某些列应该具有特定的数据类型或想要添加开始时不存在的列时,这非常有用。
此外,Autoloader 还可以在 JSON blob 列中“拯救”意外数据,您可以稍后访问。该功能对于处理类型不匹配和其他不一致非常有用。
使用模式提示将是:
self.spark \
.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
.option("cloudFiles.schemaHints", "your_schema_hints_here") # Add schema hints as needed
.load(f"{self.source_s3_bucket}/{source_table_name}") \
.distinct() \
.writeStream \
.trigger(availableNow=True) \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
.option("streamName", source_table_name) \
.start(f"{self.target_s3_bucket}/{target_table_name}")
cloudFiles.schemaEvolutionMode
设置。默认 addNewColumns
模式会停止流并向架构添加新列,但 rescue
和 failOnNewColumns
等其他模式可用于不同的行为。