摄取 avro 文件时的 Schema Evolution 是由 Event Hub Capture 编写的吗?

问题描述 投票:0回答:1

我有一个双跳摄取管道,如下:

数据生产者 -事件-> Azure 事件中心 -事件中心捕获-> Avro 格式的 ADLS -Autoloader-> Databricks UC 托管增量表

使用事件中心捕获功能时,将传入事件捕获到 ADLS 中的 Avro,我注意到它将事件存储在特定架构中,如下所示:

消息正文 抵消 序列号 分区 ID 排队时间 内容类型

读入 PySpark 数据帧时,我注意到消息正文是二进制格式的。将其转换为字符串会为我提供 JSON 格式的记录,然后我可以使用 StructType 模式解压这些记录。

我的问题是,假设我使用 Spark.readStream 和自动加载器读取数据并且它使用事件中心架构传入,这是否意味着我无法使用架构演化功能,因为它将在元数据中捕获此架构?

我的实际架构存储在消息正文列中,因此我猜不会选择附加架构?有什么办法解决这个问题吗?

非常感谢

pyspark databricks spark-structured-streaming azure-eventhub databricks-autoloader
1个回答
0
投票

由于架构在事件中心中是恒定的,因此不需要架构演变。即使您更改数据,它也会出现在

body
列中,该列是二进制类型,并在转换时转换为字符串;它仍然具有相同的架构。

因此,如果您有一个不改变的标准常量模式,您可以使用

from_json
函数加载数据。

schema = StructType([
    StructField("key1", StringType(), True),
    StructField("key2", StringType(), True),
    StructField("key3", StringType(), True),
    StructField("nestedKey", StructType([
        StructField("nestedKey1", StringType(), True)
    ]), True),
    StructField("arrayKey", ArrayType(StringType(), True), True)
])

df.select(F.from_json(F.col("body").cast("string"), schema=schema).alias("body")).display()

输出:

enter image description here

您还提到您在消息正文本身中有实际的架构,但在转换后它将是字符串类型,并且无法用于架构。

您还可以使用事件中心中的架构注册表来在消费者和生产者之间创建通用架构。请参阅以下文档了解更多信息。

创建 Azure 事件中心架构注册表 - Azure 事件中心 |微软学习

使用 Avro (Java) - Azure 事件中心验证来自 Apache Kafka 应用程序的事件微软学习

读写流式 Avro 数据 | AWS 上的 Databricks

© www.soinside.com 2019 - 2024. All rights reserved.