如果在 scala 中找不到列,如何使用模式文件读取输入 json 并填充默认值?

问题描述 投票:0回答:1

输入数据框

val input_json="""[{"orderid":"111","customers":{"customerId":"123"},"Offers":[{"Offerid":"1"},{"Offerid":"2"}]}]""";
val inputdataRdd = spark.sparkContext.parallelize(input_json :: Nil);
val inputdataRdddf = spark.read.json(inputdataRdd);
inputdataRdddf.show();

架构 df

 val schema_json="""[{"orders":{"order_id":{"path":"orderid","type":"string","nullable":false},"customer_id":{"path":"customers.customerId","type":"int","nullable":false,"default_value":"null"},"offer_id":{"path":"Offers.Offerid","type":"string","nullable":false},"eligible":{"path":"eligible.eligiblestatus","type":"string","nullable":true,"default_value":"not eligible"}},"products":{"product_id":{"path":"product_id","type":"string","nullable":false},"product_name":{"path":"products.productname","type":"string","nullable":false}}}]""";
val schemaRdd = spark.sparkContext.parallelize(schema_json :: Nil);
val schemaRdddf = spark.read.json(schemaRdd);
schemaRdddf.show();

使用模式 df ,我想从输入数据帧读取所有列。

  1. 如果可为空的键为 true,那么我想用默认值填充该列(如果该列不存在或没有任何数据)。 在上面的示例中,eligible.eligiblestatus 不存在,因此我想填充一些默认值
  2. 我还想根据架构 json 中定义的类型键更改列的数据类型。 。例如,客户 ID 在 schema json 中是 INT 类型,但在输入数据帧中它以字符串形式出现,因此我想将其转换为整数。
  3. 最终的列名应取自 schema json 的键。例如 order_id 是 orderid 属性的关键

最终 DF 应具有如下列:

order_id:String,customer_id:int, offer_id: string(array type cast to string),eligiblestatus:string

scala apache-spark
1个回答
0
投票

请找到下面的代码。

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, StructType}
val jsonSchema = """
      map<
        string, 
        struct<
            path:string,
            type:string,
            nullable:string,
            default_value:string  
        >
      >
    """

从数据框中获取嵌套列的帮助器类。

implicit class DFHelpers(df: DataFrame) {
  def fields: Seq[String] =
    this.fields(df.schema)
  def fields(
      schema: StructType = df.schema,
      root: String = "",
      sep: String = "."
  ): Seq[String] = {
    schema.fields.flatMap { column =>
      column match {
        case _ if column.dataType.isInstanceOf[StructType] =>
          fields(
            column.dataType.asInstanceOf[StructType],
            s"${root}.${column.name}".stripPrefix(sep)
          )
        case _ if column.dataType.isInstanceOf[ArrayType] =>
          column.dataType
            .asInstanceOf[ArrayType]
            .productIterator
            .filter(_.isInstanceOf[StructType])
            .map(_.asInstanceOf[StructType])
            .flatMap(f => fields(f, s"${root}.${column.name}".stripPrefix(sep)))
        case _ => Seq(s"${root}.${column.name}".stripPrefix(sep))
      }
    }.toList
  }
}
val schemaExprs = schemaDF.columns.map { columnName =>
  s"""
    filter(
       transform(
          map_entries(
             ${columnName}),
             e ->
                CASE WHEN fields[e.value.path] IS NULL AND e.value.default_value != 'null' THEN
                        CONCAT("CAST( '",e.value.default_value, "' AS ",e.value.type," )"," AS ",e.key)
                     WHEN fields[e.value.path] IS NOT NULL THEN
                        CONCAT('CAST( ',e.value.path, ' AS ',e.value.type,' )',' AS ',e.key)
                 END
          ),
       f -> f IS NOT NULL
    ) AS ${columnName}
  """
}
val columns = schemaDF
  .selectExpr(
    schemaDF.columns.map(c =>
      s"from_json(to_json(${c}), '${jsonSchema}') AS ${c}"
    ): _*
  )
  .withColumn("fields", typedLit(inputFields.map(f => f -> f).toMap)) // Checking if inputDF fields exist in the schemaDF, if not, default values will be used; if default values are not available, the column will be removed.
  .selectExpr(schemaExprs: _*)
  .select(schemaDF.columns.map(col(_)).reduce(array_union).as("columns"))
  .as[Seq[String]]
  .collect()
  .flatten
inputDF.selectExpr(columns: _*).show(false)
+-----------+------------+--------+--------+
|customer_id|eligible    |offer_id|order_id|
+-----------+------------+--------+--------+
|123        |not eligible|[1, 2]  |111     |
+-----------+------------+--------+--------+
© www.soinside.com 2019 - 2024. All rights reserved.