Spark 流使用 Abris 从 Kafka 读取,并且最新的架构注册表不会同步

问题描述 投票:0回答:1

我有一个 Spark 流,它从 Kafka Avro 消息中读取并根据最新版本的架构生成数据帧。我正在使用 abris 来执行此操作,看起来就像这样。

import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.config{AbrisConfig, FromAvroConfig}

val abrisConfig = AbrisConfig.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andRecordNameStrategy(schemaName, recordNamespace)
.usingSchemaRegistry(schemaRegistryUrl)

然后我可以根据这个 abrisConfig 读取我的 Avro 消息到 Kafka。

如您所见,我想要获取最新版本的架构,但是当流在 Spark 集群上运行并基于此 abris 配置从 Kafka 读取时,它会“卡住”他第一次启动时下载的最新架构版本运行。

因此,如果我的架构已更新,使用 abris 配置从 Kafka 读取的流不会与新的最新架构版本“刷新/同步”,因此,如果新架构有新列,我可能会丢失它。

关于如何告诉 Abris 每批自动刷新有什么建议吗?

scala apache-spark apache-kafka spark-structured-streaming confluent-schema-registry
1个回答
0
投票

不建议“刷新”,因为 Dataframe 架构将在应用程序初始化时创建一次。这意味着即使 Avro 架构在主题中发生更改,新字段也会被删除或映射回消费者应用程序的架构,直到重新启动为止。

另一种方法是定义您自己的反序列化器,解析为

GenericRecord
,然后将其有效地视为 JSON,如我的答案所示,但即使使用这种方法,您仍然需要将代码修改为有选择地提取任何生产者添加的某些字段。

© www.soinside.com 2019 - 2024. All rights reserved.