我正在从Spark中读取来自Kafka的数据(结构化流),但在Spark中从kafka获取数据的数据不是字符串格式。Spark:2.3.4
Kafka数据格式:
{"Patient_ID":316,"Name":"Richa","MobileNo":{"long":7049123177},"BDate":{"int":740},"Gender":"female"}
这是kafka激发结构化流的代码:
# spark-submit --jars kafka-clients-0.10.0.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.3.4,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/kinjalpatel/kafka_sppark.py
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
from pyspark.sql.functions import from_json, col, struct
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from pyspark.sql.column import Column, _to_java_column
sc = SparkContext()
sc.setLogLevel("ERROR")
spark = SparkSession(sc)
schema_registry_client = CachedSchemaRegistryClient(
url='http://localhost:8081')
serializer = MessageSerializer(schema_registry_client)
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "mysql-01-Patient") \
.option("partition.assignment.strategy", "range") \
.option("valueConverter", "org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter") \
.load()
df.printSchema()
mta_stream=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)", "CAST(timestamp AS STRING)", "CAST(timestampType AS STRING)")
mta_stream.printSchema()
qry = mta_stream.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()
这是我得到的输出:
+----+--------------------+----------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+----------------+---------+------+--------------------+-------------+
|null|�
Richa���...|mysql-01-Patient| 0| 160|2019-12-27 11:56:...| 0|
+----+--------------------+----------------+---------+------+--------------------+-------------+
如何获取字符串格式的value
列?
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc" )))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
val output = df
.select(from_avro('value, jsonFormatSchema) as 'user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as 'value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
对于从Kafka主题读取Avro消息并在pyspark结构化流中进行解析,没有相同的直接库。但是我们可以通过编写小型包装程序来读取/解析Avro消息,然后在pyspark流式代码中将该函数作为UDF调用。
请参考:
Reading avro messages from Kafka in spark streaming/structured streaming