writeStream()在批次数据中打印空值,即使我在kafka中通过writeStream()提供适当的json数据。

问题描述 投票:0回答:1

我试图使用schema转换json并将值打印到控制台,但writeStream()在所有列中打印空值,即使我给了正确的数据。

我给kafka主题的数据。

{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}
{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}
{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}

下面是我的scala代码...

 val readStreamDFInd = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "IndiaStocks")
  .option("startingOffsets", "earliest")
  .load()

//readStreamDFInd.printSchema()
val readStreamDFUS = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "USStocks")
  .option("startingOffsets", "earliest")
  .load()

val schema = new StructType()
  .add("stock", StringType)
  .add("buy", IntegerType)
  .add("sell", IntegerType)
  .add("profit", IntegerType)
  .add("quantity", IntegerType)
  .add("loss", IntegerType)
  .add("gender", StringType)

val stocksIndia = readStreamDFInd.selectExpr("CAST(value as STRING) as json").select(from_json($"json", schema).as("data")).select("data.*")
val stocksUSA = readStreamDFUS.selectExpr("CAST(value as STRING) as json").select(from_json($"json", schema).as("data")).select("data.*")
stocksIndia.printSchema() stocksUSA.writeStream
  .format("console")
  .outputMode("append").trigger(Trigger.ProcessingTime("5 seconds"))
  .start()
  .awaitTermination()

Below is the null values getting printed for all columns even though i provide proper data

apache-spark apache-kafka apache-spark-sql spark-structured-streaming apache-spark-dataset
1个回答
1
投票

代码工作正常,因为你也可以看到在 书籍.

从文件中可以看出: from_json 功能 null 的值,因为字符串是不可解析的。

=> 你缺少了引号。quantity 字段在你的json字符串中。


0
投票

问题出在你的kafka数据中。数量 栏目应加引号。请检查以下内容。

{"股票": "SEE", "买入":12, "卖出":15, "利润":3。"数量":27,"亏损":0,"性别": "M"}{"股票": "SEE","买入":12,"卖出":15,"盈利":3。"数量":27,"亏损":0,"性别": "M"}{"股票": "SEE","买入":12,"卖出":15,"盈利":3。"数量":27, "损失":0, "性别": "男"}。

© www.soinside.com 2019 - 2024. All rights reserved.