使用 Apicurio 反序列化来自 kafka 的 avro 格式数据时在 Pyspark 中遇到错误

问题描述 投票:0回答:1

我正在开发pyspark,通过apicurio从kafka主题使用spark结构化流获取avro编码数据,并希望去序列化它并应用转换,然后将其保存到配置单元。如果我不使用 “模式为 PERMISSIVE”,我将面临以下错误:

错误: “在记录解析中检测到格式错误的记录。当前解析模式:FAILFAST。要将格式错误的记录处理为空结果,请尝试将选项“mode”设置为“PERMISSIVE”。”

如果我使用 “模式为 PERMISSIVE”,那么我得到所有空值,如下所示:

-------------------------------------------
Batch: 8
-------------------------------------------
+------+-----+------+----+-----+-----------+
|before|after|source|op  |ts_ms|transaction|
+------+-----+------+----+-----+-----------+
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
+------+-----+------+----+-----+-----------+

如果我打印我的模式,它表明它读取正确

{"type":"record","name":"Envelope","namespace":"namespace","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"ID","type":"double"},{"name":"name","type":["null","string"],"default":null}],"connect.name":"Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.oracle","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","string"],"default":null},{"name":"scn","type":["null","string"],"default":null},{"name":"commit_scn","type":["null","string"],"default":null},{"name":"lcr_position","type":["null","string"],"default":null},{"name":"rs_id","type":["null","string"],"default":null},{"name":"ssn","type":["null","int"],"default":null},{"name":"redo_thread","type":["null","int"],"default":null}],"connect.name":"io.debezium.connector.oracle.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"connectname"}

有趣的是,我给出了 schema 注册表的 url,所以我不认为 schema 是错误的,我使用的代码如下:


# pyspark imports
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import StringType, TimestampType, IntegerType, BinaryType

# schema registry imports
from confluent_kafka.schema_registry import SchemaRegistryClient
import time
import pandas as pd
# import json
from pyspark.conf import SparkConf

#kafka_url = "kafka-broker:29092"
schema_registry_url = "http://localhost:8082/apis/ccompat/v6"
kafka_producer_topic = "topic"
#kafka_analyzed_topic = "wikimedia.processed"
schema_registry_subject = f"{kafka_producer_topic}-value"
#schema_registry_analyzed_data_subject = f"{kafka_analyzed_topic}-value"

KAFKA_TOPIC_NAME = "topic"
kafka_bootstrap_servers = 'ip:9092'

# UDF function
binary_to_string_udf = func.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())

def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
    sr = SchemaRegistryClient({'url': schema_registry_url})
    latest_version = sr.get_latest_version(schema_registry_subject)
    return sr, latest_version

if __name__ == "__main__":
    print("Welcome Here")
    print("Data Processing App Start Form Here")
    print(time.strftime("%Y-%m-%d %H:%M:%S"))


    conf = SparkConf().setAppName("MyApp002").setMaster("local[*]") \
            .set("spark.executor.instances", "10")\
            .set("spark.executor.memory", "28g") \
            .set("spark.executor.cores", "16") 

    spark = SparkSession.builder.master("local[*]").appName("Pyspark App to Load WSLOG From Raw To Currated In Realtime").config(conf=conf).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark.sql.shuffle.partitions", "2")
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.codegen.wholeStage", "false")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")  


    wikimedia_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", KAFKA_TOPIC_NAME) \
        .option("startingOffsets", "earliest") \
        .option("spark.streaming.kafka.maxRatePerPartition", "50") \
        .load()
    
    wikimedia_df.printSchema()


    # remove first 5 bytes from value
    wikimedia_df = wikimedia_df.withColumn('fixedValue', func.expr("substring(value, 6, length(value)-5)"))

    #  get schema id from value
    #wikimedia_df = wikimedia_df.withColumn('valueSchemaId', binary_to_string_udf(func.expr("substring(value, 2, 4)")))
    #print(wikimedia_df)

    # get schema using subject name
    _, latest_version_wikimedia = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
    print(latest_version_wikimedia.schema.schema_str)
    

    #deserialize data 
    fromAvroOptions = {"mode":"PERMISSIVE"}
    decoded_output = wikimedia_df.select(
        from_avro(
            func.col("fixedValue"), latest_version_wikimedia.schema.schema_str, fromAvroOptions
        )
        .alias("wikimedia")
    )
    wikimedia_value_df = decoded_output.select("wikimedia.*")
    wikimedia_value_df.printSchema()

    wikimedia_value_df \
        .writeStream \
        .format("console") \
        .trigger(processingTime='1 second') \
        .outputMode("append") \
        .option("truncate", "false") \
        .start() \
        .awaitTermination()

我使用的 Debezium 连接器配置如下:

{
    "name": "wslog-debezium-xstream102",
    "config":
    {
        "connector.class":"io.debezium.connector.oracle.OracleConnector",
        "tasks.max":"1",
        "database.server.name":"abc",
        "database.hostname":"ip-addr",
        "database.port":"port",
        "database.user":"username",
        "database.password":"pass",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "false",
        "key.converter.enhanced.avro.schema.support": "true",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "false",
        "value.converter.enhanced.avro.schema.support": "true",
        "database.out.server.name" : "outbound_servername",
        "database.dbname":"VAULSYS",
        "database.history.kafka.bootstrap.servers":"ipr:9092",
        "snapshot.mode":"initial",
        "database.history.kafka.topic":"schema-changes.inventory",
        "table.include.list":"tablename",
        "column.include.list": "column list",
        "database.history.store.only.captured.tables.ddl":true,
        "database.history.skip.unparseable.ddl":true,
        "offset.flush.interval.ms":10000,
        "offset.flush.timeout.ms":60000,
        "log.mining.batch.size.min":10000,
        "log.mining.batch.size.max":300000,
        "log.mining.batch.size.default":50000,
        "log.mining.view.fetch.size":20000,
        "decimal.handling.mode":"double",
        "time.precision.mode":"connect"
        
    }
}

我可以看到我正在通过 kafka 消费者获取 avro 编码数据。

TABLE606472771rڸ���b ��5A6DATA ENCODER-oracle-encodedstr�ڽ��trueENCODE

我无法弄清楚我在这里犯的错误,还有其他人在使用 kafka 和 Spark 与 apicurio 工作时遇到同样的事情吗?

pyspark apache-kafka avro confluent-schema-registry apicurio-registry
1个回答
1
投票
我通过将数字从 5 增加 10 解决了这个问题,可以在以下代码中看到:

## data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 5, length(value))")) data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 10, length(value))"))
我知道当我们通过 Confluence 解码 avro 时总是存在 5 个魔法字节填充,但我不知道为什么当我使用 10 个字节时它起作用了。

© www.soinside.com 2019 - 2024. All rights reserved.