我有一个双跳摄取管道,如下:
数据生产者 -事件-> Azure 事件中心 -事件中心捕获-> Avro 格式的 ADLS -Autoloader-> Databricks UC 托管增量表
使用事件中心捕获功能时,将传入事件捕获到 ADLS 中的 Avro,我注意到它将事件存储在特定架构中,如下所示:
消息正文 抵消 序列号 分区 ID 排队时间 内容类型
读入 PySpark 数据帧时,我注意到消息正文是二进制格式的。将其转换为字符串会为我提供 JSON 格式的记录,然后我可以使用 StructType 模式解压这些记录。
我的问题是,假设我使用 Spark.readStream 和自动加载器读取数据并且它使用事件中心架构传入,这是否意味着我无法使用架构演化功能,因为它将在元数据中捕获此架构?
我的实际架构存储在消息正文列中,因此我猜不会选择附加架构?有什么办法解决这个问题吗?
非常感谢
由于架构在事件中心中是恒定的,因此不需要架构演变。即使您更改数据,它也会出现在
body
列中,该列是二进制类型,并在转换时转换为字符串;它仍然具有相同的架构。
因此,如果您有一个不改变的标准常量模式,您可以使用
from_json
函数加载数据。
schema = StructType([
StructField("key1", StringType(), True),
StructField("key2", StringType(), True),
StructField("key3", StringType(), True),
StructField("nestedKey", StructType([
StructField("nestedKey1", StringType(), True)
]), True),
StructField("arrayKey", ArrayType(StringType(), True), True)
])
df.select(F.from_json(F.col("body").cast("string"), schema=schema).alias("body")).display()
输出:
您还提到您在消息正文本身中有实际的架构,但在转换后它将是字符串类型,并且无法用于架构。
您还可以使用事件中心中的架构注册表来在消费者和生产者之间创建通用架构。请参阅以下文档了解更多信息。
创建 Azure 事件中心架构注册表 - Azure 事件中心 |微软学习