如何在Spark结构化流中解析没有模式的JSON数据?

问题描述 投票:0回答:1

[使用星型结构化流从kafka消费复杂的嵌套json数据,并且我没有特定的架构,因为大多数数据都包含来自Web应用程序的相关requestBody和responseBody。

如何在Spark结构化流媒体中在没有模式的情况下从嵌套json数据解析或提取字段?

Spark版本2.2.0

感谢您提供任何帮助。

scala apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

您可以执行以下操作,我已经将Kafka事件用作(String,String),其中键是Customer id,值是JSON event作为String。您可以在代码中找到代码说明,

import org.apache.log4j.Logger
import org.apache.spark.sql.functions.{col, collect_list, from_json}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
import org.apache.spark.sql.{Dataset, _}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse

import scala.collection.mutable
import scala.util.Try

object JsonKafkaStreamProcessor {

  val rootLogger: Logger = Logger.getRootLogger

  //Below code works for a Kafka nested JSON event as below
  /**
   * {
   * "timestamp": "2019-06-12T23:59:59+00:00",
   * "data": {
   * "user_id": 73598
   * }
   * }
   *
   * @param args
   */
  def main(args: Array[String]): Unit = {
    val spark = Constant.getSparkSess

    //Create and load you Spark Dataframe from Kafka
    val kafkaDf: DataFrame = spark.readStream.format("kafka")
      .option("subscribe", "topic-name")
      .option("kafka.bootstrap.servers", "kafkahost:port")
      .load()

    //minimal dummy schema field which I am expecting from my json
    //replace customer_id with your mandatory field
    val schema = new StructType()
      .add("customer_id", StringType)

    //implicit format to parse JSON for json4s APIs
    implicit val formats: DefaultFormats.type = DefaultFormats


    //Spark's Out stream of the
    val outputDf = kafkaDf
      //Select kafka event as String
      .select(col("value").cast("string").as("value"),
        //Parse JSON string to `data` column
        from_json(col("value").cast("string"), schema).as("data"))
      //Select only value from data field in a column "value"
      .select("value", "data.*")
      .writeStream
      //Fail safe
      .option("checkpointLocation", "/checkpoint/dir/")

      //Stream job will be executed every 1 second
      .trigger(Trigger.ProcessingTime("1 second"))

      //Iterate every 1 second batch as a individual Dataset
      .foreachBatch((dsw: Dataset[_], batchId: Long) => {
        val recordsCount = dsw.count()
        rootLogger.info(s"""DataFrame record count="${recordsCount}" batchId=${batchId} """)

        //Grouping the dataset by Customer ID int a List
        dsw.groupBy("customer_id")
          .agg(collect_list("value").cast(ArrayType(StringType)).as("value"))
          .foreach((row: Row) => {
            //You can get the customer directly
            val customerId = row.getAs[String]("customer_id")

            //JSOn event are still List<String> so iterate 
            val data = row.getAs[mutable.WrappedArray[String]]("value")
            data
              .array
              .map(event => {
                //Parse individual JSON using json4s APIs
                val parsedJson = parse(event)
                //query JSON to extract nested values
                val timeStampStr: String = (parsedJson \ "timestamp").extract[String]
                val user_id: String = Try((parsedJson \ "user_id").extract[String]).getOrElse("no_user_")
                rootLogger.info(s"timeStampStr=${timeStampStr} user_id=${user_id}")
              })

          })
      })
    outputDf.start().awaitTermination()
  }

}
© www.soinside.com 2019 - 2024. All rights reserved.