无法使用架构注册表获取正确的架构

问题描述 投票:0回答:1

我正在使用 PySpark 中的 from_avro 函数以 Avro 格式从 Kafka 读取数据,并利用在模式注册表中注册的模式。但是,我遇到了一个问题,即架构注册表无法正确考虑批处理期间的架构更改。因此,当流作业开始时,它始终使用最新的架构,但它无法考虑流作业之间可能发生的任何架构更改。在理想情况下,架构注册表应考虑前 5 个字节中指定的架构 ID,以确保准确的架构解析。

data_df = (
    spark.readStream.format("kafka")
    .option("kafka.ssl.endpoint.identification.algorithm", "")
    .option("kafka.security.protocol", "SSL")
    .option("kafka.bootstrap.servers", servers_details)
    .option("kafka.ssl.truststore.location", location)
    .option("kafka.ssl.truststore.password", pwd)
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .option("maxOffsetsPerTrigger", 30)
    .option("subscribe", name)
    .load()
)

transform_df = (
    df.withColumn(
        "record",
        from_avro(
            fn.col("value"),
            schemaRegistryAddress="http://schema-registry.com",
            subject=f"{topic_name}-value",
        ),
    )
    .withColumn("schema_id", function_convert(fn.expr("substring(value, 2, 4)")))
    .select("schema_id", fn.col("record"))
)
display(transform_df)

我尝试了 from_avro 的选项,但似乎不起作用

transform_df = df.withColumn(
    "record",
    from_avro(
        fn.col("value"),
        options={"confluent.value.schema.validation": "true"},
        schemaRegistryAddress="http://schema-registry.com",
        subject=f"{topic_name}-value",
    ),
).select(fn.col("record").alias("RECORD_CONTENT"))

apache-spark pyspark databricks spark-streaming spark-avro
1个回答
0
投票

看起来您正在使用 PySpark 中的

from_avro
函数从 Kafka 读取 Avro 数据,并面临批处理期间未考虑架构更改的问题。不幸的是,PySpark 中的
from_avro
函数没有直接提供使用前 5 个字节指定模式解析的模式 ID 的选项。您尝试的
confluent.value.schema.validation
选项无法解决此特定场景。

但是,您可以通过使用架构注册表和 Avro 数据前 5 个字节中存在的架构 ID 手动解析 Avro 架构来解决此问题。这是实现这一目标的可能方法:

import io
import requests
from avro.schema import AvroSchema
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Function to fetch Avro schema from schema registry based on schema ID
def fetch_schema_from_registry(schema_registry_url, subject, schema_id):
    url = f"{schema_registry_url}/subjects/{subject}/versions/{schema_id}"
    response = requests.get(url)
    response.raise_for_status()
    return response.json()["schema"]

# Function to parse Avro data using the schema from the schema registry
def parse_avro_data(avro_data, schema_registry_url, subject):
    schema_id = int.from_bytes(avro_data[:5], "big")
    avro_schema_str = fetch_schema_from_registry(schema_registry_url, subject, schema_id)
    avro_schema = AvroSchema(avro_schema_str)
    return avro_schema.decode(io.BytesIO(avro_data[5:]))

# UDF to parse Avro data and apply schema resolution
parse_avro_udf = fn.udf(lambda value: parse_avro_data(value, "http://schema-registry.com", f"{topic_name}-value"))

data_df = (
    spark.readStream.format("kafka")
    # ... other Kafka options ...
    .load()
)

transform_df = data_df.withColumn("record", parse_avro_udf(fn.col("value")))

# Continue with your processing on `transform_df` as needed

在此方法中,我们定义一个 UDF (

parse_avro_udf
),它获取 Avro 数据,使用前 5 个字节中的模式 ID 从模式注册表中获取相应的模式,然后使用解析后的模式解析 Avro 数据。
fetch_schema_from_registry
函数负责根据模式ID从模式注册表中获取模式。

通过使用此 UDF,您应该能够在批处理期间处理架构更改,因为它将根据 Avro 数据中存在的架构 ID 解析每个记录的正确架构。

© www.soinside.com 2019 - 2024. All rights reserved.