如何转义'是保留关键字,不能用作字段名称'在Spark SQL和结构化流中出错?

问题描述 投票:0回答:1

[当前,当我使用结构化流v2.1.0 + Kafka v0.10进行实时日志处理时,我在线程“ main” java.lang.UnsupportedOperationException中得到了异常:package是一个保留关键字,不能用作字段名

我的任务要求两个逻辑部分:

第1部分。通过net.liftweb.json将包含来自字符串的json的日志消息转换为相应的案例类]

我的一个案例类,定义如下:

case class Mobile(val title: Option[String],
                  val desc: Option[String],
                  val adtype: Option[Int],
                  val apkname: Option[String],
                  @transient val `package`: Option[String],
                  val appstoreid: Option[String]
                 ) extends java.io.Serializable

第2部分。使用结构化流v2.1.0 + kafka v0.10进行实时处理:

    val spark: SparkSession = SparkSession.
      builder().
      appName("structured streaming test").
      getOrCreate()

    val df = spark.
      readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("subscribe", "stream1").
      option("maxOffsetPerTrigger", "10000").
      load()

    import spark.implicits._

    val ds = df.

      //change the value's type from binary to STRING
      selectExpr("CAST(value AS STRING)").
      as[String].
      map(myLogicToProcessLogs)

    val query = ds.
      writeStream.
      outputMode("append").
//      format("console").
      trigger(ProcessingTime("10 seconds")).
      foreach(new HttpSink).
      start()

    query.awaitTermination()

我的错误原因是我的日志消息中包含一些Java保留关键字,例如'package',但总是会因Spark SQL编码器而失败

[注意:通过使用package来转义scala关键字检查,并使用@transient关键字来转义Java序列化,我可以成功地将上述case类转换为RDD,并进行后续的转换和操作以进行批处理,而不会出现任何错误提示。但是,如何从Spark SQL编码器和Structrued Streaming中逃避关键字检查?

有一个相关的问题:spark-submit fails when case class fields are reserved java keywords with backticks但我可以做到这一点,因为liftweb json仍需要案例类构造函数参数“ package”进行解析。

[我还发现还有其他json工具,例如Gson提供JSON字段命名支持,可以将标准Java字段名称转换为Json字段名称,在liftweb json上是否有类似的方法?https://sites.google.com/site/gson/gson-user-guide#TOC-JSON-Field-Naming-Support

scala apache-spark apache-spark-sql spark-structured-streaming lift-json
1个回答
0
投票

使用ScalaReflection为我的案例类生成架构时,我遇到了这个问题的变体。我通过以编程方式定义架构来解决它:

val schema = StructType(Seq(
  StructField("title", StringType),
  StructField("desc", StringType),
  StructField("adtype", IntegerType),
  StructField("apkname", StringType),
  StructField("package", StringType),
  StructField("appstoreid", StringType)
))

您仍然可以按常规方式使用case类,只是不要使用它通过org.apache.spark.sql.catalyst.ScalaReflection.schemaFor [CC]]生成用于解析字符串的模式。

© www.soinside.com 2019 - 2024. All rights reserved.