在spark结构化流媒体中,Json字符串应该作为Kafka主题消耗,而不是模式。

问题描述 投票:0回答:1

我需要消耗Kafka主题,它为每行产生动态Json字符串,我无法解析没有schema的Json字符串。在我的情况下,Schema可以是动态的。我的情况是,Schema可以动态地解析Json字符串。 可以推断出json模式,但需要 "DATASET "或 "JSON文件"。

有什么办法可以将Kafka的topic(value)转换为DATASET?这样我就可以使用 "DATASET "了。火花.read.json,它接受DATASET作为输入,并能解析json的模式。

UPDATE(UPDATE),它接受DATASET的输入,可以解析json的模式。谢谢你们,为我指明了正确的方向,我得到了我需要的东西。在读流中消耗Kafka主题,并再次处理通过 火花.read.json.但如果我使用以下代码。

val klines = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "host1:port1,host2:port2").
  option("subscribe", "topic").
  load().
  select($"value".cast("string").alias("value"))

val query = klines.
  select(from_json($"value",schema=spark.read.json(klines.as[String]).schema)).
  writeStream.
  format("console").
  start()

query.awaitTermination()

得到以下错误。在线程 "main "中出现异常 org.apache.spark.sql.AnalysisException: 有流媒体源的查询必须用writeStream.start();;kafka。

我正在做一些中间计算,比如扁平化模式。但如果我这样做同样的错误发生。如何在spark结构化流(scala)中处理基本的中间计算?

json apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

JSON是一个字符串。你可以只是一个字符串类型的schema。

所以我可以使用spark.read.json。

spark.read.json 是来自 文件系统.

你可能想 spark.readStream.format("kafka") 如果你想从Kafka中读取数据,Spark文档中有足够详细的描述。

Spark文档中的第一个例子正是这样做的

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

你会有问题对数据进行任何类型的有用分析,然而鉴于每条记录都有可能不共享相同的字段,所以做一些类似于 get_json_object 毫无意义

你最好使用原始的Kafka消费者API或KStreams。,不需要任何模式,但是你的问题是 模式--它是 反序列化 变成一个具有具体字段的对象类型,可以进行查询

© www.soinside.com 2019 - 2024. All rights reserved.