我有一个 Spark 流,它从 Kafka Avro 消息中读取并根据最新版本的架构生成数据帧。我正在使用 abris 来执行此操作,看起来就像这样。
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.config{AbrisConfig, FromAvroConfig}
val abrisConfig = AbrisConfig.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andRecordNameStrategy(schemaName, recordNamespace)
.usingSchemaRegistry(schemaRegistryUrl)
然后我可以根据这个 abrisConfig 读取我的 Avro 消息到 Kafka。
如您所见,我想要获取最新版本的架构,但是当流在 Spark 集群上运行并基于此 abris 配置从 Kafka 读取时,它会“卡住”他第一次启动时下载的最新架构版本运行。
因此,如果我的架构已更新,使用 abris 配置从 Kafka 读取的流不会与新的最新架构版本“刷新/同步”,因此,如果新架构有新列,我可能会丢失它。
关于如何告诉 Abris 每批自动刷新有什么建议吗?
不建议“刷新”,因为 Dataframe 架构将在应用程序初始化时创建一次。这意味着即使 Avro 架构在主题中发生更改,新字段也会被删除或映射回消费者应用程序的架构,直到重新启动为止。
另一种方法是定义您自己的反序列化器,解析为
GenericRecord
,然后将其有效地视为 JSON,如我的答案所示,但即使使用这种方法,您仍然需要将代码修改为有选择地提取任何生产者添加的某些字段。