Avro 模式演变(向后兼容)使用 pyspark 结构化流返回 null

问题描述 投票:0回答:2

Avro 向后兼容返回空记录。

我将一条由schema_ver1.avsc编码的记录和一条由schema_ver2.avsc编码的记录发送到kafka, 然后,我查询名为 avro_sink3‖decode by ver2 schema(avro_json_schema_ver2) 的 pyspark Streaming memory sink。

期待这样的记录

预期记录

+-----------------------------------+
|from_avro(value)                   |
+-----------------------------------+
|{ver1_yuki_schema, login, 4, 1}   | 
|{ver2_yuki_schema, login2, 4, 1000}| 
+-----------------------------------+

但是,我得到的输出如下所示。

实际输出1

+-----------------------------------+
|from_avro(value)                   |
+-----------------------------------+
|{null, null, null, null}           |   -> endoded by schema_ver1.avsc decoded by schema_ver2.avsc(record No.2)
|{ver2_yuki_schema, login2, 4, 1000}|  -> endoded by schema_ver2.avsc decoded by schema_ver2.avsc(record No.2)
+-----------------------------------+

我该如何解决这个问题。

pyspark 流水槽

memory_stream_check29 = df \
  .select("value").alias("value") \
  .writeStream \
  .format("memory") \
  .trigger(processingTime="5 seconds") \
  .option("checkpointLocation", "/tmp/kafka/avro_file11131/") \
  .queryName("avro_sink3") \
  .start()

查询内存接收器

spark.sql("select * from avro_sink3").select(from_avro("value",avro_json_schema_ver2, {"mode" : "PERMISSIVE"})).show(truncate=False)

schema_ver1.avsc

{
  "namespace": "root",
  "type": "record",
  "name": "Device",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "type", "type": "string" },
    { "name": "sendtime", "type": "int" },
  ]
}

schema_ver2.avsc

{
  "namespace": "root",
  "type": "record",
  "name": "Device",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "type", "type": "string" },
    { "name": "sendtime", "type": "int" },
    { "name": "temp", "type": "string", "default": "1" }
  ]
}


环境 spark3.2

apache-spark pyspark apache-kafka avro
2个回答
0
投票

from_avro 期望作者的模式可用于反序列化。您不能使用 avro_json_schema_ver2 反序列化使用 avro_json_schema_ver1 写入的记录。另一种解决方案是通过在记录头中使用模式标识符来反序列化使用所有模式,然后将正确的模式映射到每个记录。


0
投票

Spark 开源版本没有像我想象的那样实现 AVRO schema evolution。

你必须always找出特定作者的模式版本并在参数

jsonFormatSchema
中使用它。有一个名为 avroSchema
选项
,您可以使用它来实现伪模式演化。

我称之为伪模式演化,因为

jsonFormatSchema
必须始终是特定作者的模式版本,因此它将无法使用使用另一个版本序列化的 AVRO 数据。

因此,在您的情况下,您可以执行以下操作,它将处理版本 1 中的数据:

spark.sql("select * from avro_sink3").select(from_avro("value",avro_json_schema_ver1, {"mode": "PERMISSIVE", "avroSchema": "avro_json_schema_ver2"})).show(truncate=False)

但它不适用于版本 2 中的数据,即使它们都是兼容的。 因此您需要为每个特定版本提供两个不同的数据框,稍后您可以在两个数据框中应用

union
。请参阅:https://github.com/apache/spark/pull/27045https://github.com/apache/spark/pull/26780.

在我看来这很糟糕。

Databricks Runtime 12.1 及更高版本通过真正的模式演变实现了自己的特定版本,但仅适用于模式注册表。看: https://docs.databricks.com/structured-streaming/avro-dataframe.html#example-with-schema-registry

同样在这里,有人实现了这个问题的解决方案(我没有测试):

© www.soinside.com 2019 - 2024. All rights reserved.