如何从pyspark中的kafka以字符串格式从Confluent Schema Registry获取Avro数据?

问题描述 投票:0回答:2

我正在从Spark中读取来自Kafka的数据(结构化流),但在Spark中从kafka获取数据的数据不是字符串格式。Spark:2.3.4

Kafka数据格式:

{"Patient_ID":316,"Name":"Richa","MobileNo":{"long":7049123177},"BDate":{"int":740},"Gender":"female"}

这是kafka激发结构化流的代码:

#  spark-submit --jars kafka-clients-0.10.0.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.3.4,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/kinjalpatel/kafka_sppark.py
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
from pyspark.sql.functions import from_json, col, struct
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from pyspark.sql.column import Column, _to_java_column

sc = SparkContext()
sc.setLogLevel("ERROR")
spark = SparkSession(sc)
schema_registry_client = CachedSchemaRegistryClient(
url='http://localhost:8081')
serializer = MessageSerializer(schema_registry_client)
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mysql-01-Patient") \
  .option("partition.assignment.strategy", "range") \
  .option("valueConverter", "org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter") \
  .load()
df.printSchema()
mta_stream=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)", "CAST(timestamp AS STRING)", "CAST(timestampType AS STRING)")
mta_stream.printSchema()
qry = mta_stream.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()

这是我得到的输出:

+----+--------------------+----------------+---------+------+--------------------+-------------+
| key|               value|           topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------------+---------+------+--------------------+-------------+
|null|�
Richa���...|mysql-01-Patient|        0|   160|2019-12-27 11:56:...|            0|
+----+--------------------+----------------+---------+------+--------------------+-------------+

如何获取字符串格式的value列?

apache-spark apache-kafka avro spark-structured-streaming
2个回答
0
投票

来自Spark documentation

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"    )))

val df = spark
    .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()


val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

来自databricks documentation

import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

-1
投票

对于从Kafka主题读取Avro消息并在pyspark结构化流中进行解析,没有相同的直接库。但是我们可以通过编写小型包装程序来读取/解析Avro消息,然后在pyspark流式代码中将该函数作为UDF调用。

请参考:

Reading avro messages from Kafka in spark streaming/structured streaming

© www.soinside.com 2019 - 2024. All rights reserved.