我需要消耗Kafka主题,它为每行产生动态Json字符串,我无法解析没有schema的Json字符串。在我的情况下,Schema可以是动态的。我的情况是,Schema可以动态地解析Json字符串。 可以推断出json模式,但需要 "DATASET "或 "JSON文件"。
有什么办法可以将Kafka的topic(value)转换为DATASET?这样我就可以使用 "DATASET "了。火花.read.json,它接受DATASET作为输入,并能解析json的模式。
UPDATE(UPDATE),它接受DATASET的输入,可以解析json的模式。谢谢你们,为我指明了正确的方向,我得到了我需要的东西。在读流中消耗Kafka主题,并再次处理通过 火花.read.json.但如果我使用以下代码。
val klines = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "host1:port1,host2:port2").
option("subscribe", "topic").
load().
select($"value".cast("string").alias("value"))
val query = klines.
select(from_json($"value",schema=spark.read.json(klines.as[String]).schema)).
writeStream.
format("console").
start()
query.awaitTermination()
得到以下错误。在线程 "main "中出现异常 org.apache.spark.sql.AnalysisException: 有流媒体源的查询必须用writeStream.start();;kafka。
我正在做一些中间计算,比如扁平化模式。但如果我这样做同样的错误发生。如何在spark结构化流(scala)中处理基本的中间计算?
JSON是一个字符串。你可以只是一个字符串类型的schema。
所以我可以使用spark.read.json。
spark.read.json
是来自 文件系统.
你可能想 spark.readStream.format("kafka")
如果你想从Kafka中读取数据,Spark文档中有足够详细的描述。
Spark文档中的第一个例子正是这样做的
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
你会有问题对数据进行任何类型的有用分析,然而鉴于每条记录都有可能不共享相同的字段,所以做一些类似于 get_json_object
毫无意义
你最好使用原始的Kafka消费者API或KStreams。,不需要任何模式,但是你的问题是 不 模式--它是 反序列化 变成一个具有具体字段的对象类型,可以进行查询