我正在使用 PySpark 中的 from_avro 函数以 Avro 格式从 Kafka 读取数据,并利用在模式注册表中注册的模式。但是,我遇到了一个问题,即架构注册表无法正确考虑批处理期间的架构更改。因此,当流作业开始时,它始终使用最新的架构,但它无法考虑流作业之间可能发生的任何架构更改。在理想情况下,架构注册表应考虑前 5 个字节中指定的架构 ID,以确保准确的架构解析。
data_df = (
spark.readStream.format("kafka")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.security.protocol", "SSL")
.option("kafka.bootstrap.servers", servers_details)
.option("kafka.ssl.truststore.location", location)
.option("kafka.ssl.truststore.password", pwd)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", 30)
.option("subscribe", name)
.load()
)
transform_df = (
df.withColumn(
"record",
from_avro(
fn.col("value"),
schemaRegistryAddress="http://schema-registry.com",
subject=f"{topic_name}-value",
),
)
.withColumn("schema_id", function_convert(fn.expr("substring(value, 2, 4)")))
.select("schema_id", fn.col("record"))
)
display(transform_df)
我尝试了 from_avro 的选项,但似乎不起作用
transform_df = df.withColumn(
"record",
from_avro(
fn.col("value"),
options={"confluent.value.schema.validation": "true"},
schemaRegistryAddress="http://schema-registry.com",
subject=f"{topic_name}-value",
),
).select(fn.col("record").alias("RECORD_CONTENT"))
看起来您正在使用 PySpark 中的
from_avro
函数从 Kafka 读取 Avro 数据,并面临批处理期间未考虑架构更改的问题。不幸的是,PySpark 中的 from_avro
函数没有直接提供使用前 5 个字节指定模式解析的模式 ID 的选项。您尝试的 confluent.value.schema.validation
选项无法解决此特定场景。
但是,您可以通过使用架构注册表和 Avro 数据前 5 个字节中存在的架构 ID 手动解析 Avro 架构来解决此问题。这是实现这一目标的可能方法:
import io
import requests
from avro.schema import AvroSchema
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Function to fetch Avro schema from schema registry based on schema ID
def fetch_schema_from_registry(schema_registry_url, subject, schema_id):
url = f"{schema_registry_url}/subjects/{subject}/versions/{schema_id}"
response = requests.get(url)
response.raise_for_status()
return response.json()["schema"]
# Function to parse Avro data using the schema from the schema registry
def parse_avro_data(avro_data, schema_registry_url, subject):
schema_id = int.from_bytes(avro_data[:5], "big")
avro_schema_str = fetch_schema_from_registry(schema_registry_url, subject, schema_id)
avro_schema = AvroSchema(avro_schema_str)
return avro_schema.decode(io.BytesIO(avro_data[5:]))
# UDF to parse Avro data and apply schema resolution
parse_avro_udf = fn.udf(lambda value: parse_avro_data(value, "http://schema-registry.com", f"{topic_name}-value"))
data_df = (
spark.readStream.format("kafka")
# ... other Kafka options ...
.load()
)
transform_df = data_df.withColumn("record", parse_avro_udf(fn.col("value")))
# Continue with your processing on `transform_df` as needed
在此方法中,我们定义一个 UDF (
parse_avro_udf
),它获取 Avro 数据,使用前 5 个字节中的模式 ID 从模式注册表中获取相应的模式,然后使用解析后的模式解析 Avro 数据。 fetch_schema_from_registry
函数负责根据模式ID从模式注册表中获取模式。
通过使用此 UDF,您应该能够在批处理期间处理架构更改,因为它将根据 Avro 数据中存在的架构 ID 解析每个记录的正确架构。