如何将 Kafka 中的字节转换为其原始对象?

问题描述 投票:0回答:2

我从 Kafka 获取数据,然后使用默认解码器反序列化

Array[Byte]
,之后我的 RDD 元素看起来像
(null,[B@406fa9b2)
(null,[B@21a9fe0)
,但我想要具有模式的原始数据,那么我该如何实现这一点?

我以 Avro 格式序列化消息。

apache-spark apache-kafka spark-streaming spark-avro
2个回答
5
投票

您必须使用适当的反序列化器对字节进行解码,例如字符串或自定义对象。

如果你不进行解码,你会得到

[B@406fa9b2
,这只是 Java 中字节数组的文本表示。

Kafka 对消息的内容一无所知,因此它将字节数组从生产者传递到消费者。

在 Spark Streaming 中,您必须对键和值使用序列化器(引用 KafkaWordCount 示例):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

使用上述序列化器,您将获得

DStream[String]
,因此您可以使用
RDD[String]

但是,如果您想直接将字节数组反序列化为自定义类,则必须编写一个自定义 Serializer (这是 Kafka 特定的,与 Spark 无关)。

我建议使用具有固定模式的 JSON 或 Avro(使用Kafka、Spark 和 Avro - 第 3 部分,生成和使用 Avro 消息中描述的解决方案)。


结构化流媒体中,管道可能如下所示:

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string") // <-- conversion here

0
投票

Jacek提供了很好的答案这里

以下答案是他答案的延伸。

如果您使用 Spark 的结构化流来消费数据,那么您可以执行以下操作:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


// let us say your spark session is defined with 'spark'

val readDF = spark.readStream
        .format("kafka")
        .option("subscribe", "topic1")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("groupIdPrefix","whatever-group-id")
        .option("startingOffsets","latest")
        .load
        .select(col("value").cast(StringType))

您可以在生产者端编写任何您想要的数据类型,我以 String 为例。

© www.soinside.com 2019 - 2024. All rights reserved.