如何重命名现有列在数组中添加新列?

问题描述 投票:0回答:1

env:saprk-2.4.5

source.json

{
    "group": "1",
    "name": "badboi",
    "rank": "3",
    "fellows": [
        {
            "name": "David",
            "age": "25",
            "hobby": "code"
        },
        {
            "name": "John",
            "age": "27",
            "hobby": "tennis"
        },
        {
            "name": "Anata",
            "age": "23",
            "hobby": "dance"
        }
    ]
}

我想要在每个元素中添加新列'ID'(由md5使用'name'JSON主体生成,并在'fellows'数组中重命名其他元素的列名,例如:

output.json

{
    "group": "1",
    "name": "badboi",
    "rank": "3",
    "fellows": [
        {
            "ID":"6F94AF80FC86BD2DBFAFA9C90BF33522",
            "NAME": "David",
            "AGE": "25",
            "HOBBY": "code"
        },
        {
            "ID":"CF848467689DD81CAC9E644F8294B641",
            "NAME": "John",
            "AGE": "27",
            "HOBBY": "tennis"
        },
        {
            "ID":"4F11EBFF1667DDD817921279EEBD5451",
            "NAME": "Anata",
            "AGE": "23",
            "HOBBY": "dance"
        }
    ]
}

我的解决方案:

1

我已经尝试过'explode'和'collect_set'func来解决它:

  val source = spark.read.option("multiLine", "true").json("/mypath/source.json")
  val explode_values = source.select($"group",$"name",$"rank",explode($"fellows").as("explode_fellows"))
  val renamedDF =  explode_values.select($"group",$"name",$"rank", struct(md5(to_json(struct($"explode_fellows.name".as("NAME")))).as("ID"),  $"explode_fellows.name".as("NAME"), $"explode_fellows.age".as("AGE"), $"explode_fellows.HOBBY".as("HOBBY")).as("fellows"))

val result = renamedDF.select($"group", $"name", $"rank", $"fellows").groupBy($"group").agg(first($"name").as("name"),first($"rank").as("rank"), collect_set($"fellows").as("fellows"))

然后结果的模式为:

root
 |-- group: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- fellows: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- AGE: string (nullable = true)
 |    |    |-- HOBBY: string (nullable = true)

2

使用'array_zip'只能重命名列:

val result2 = source.select($"group", $"name", $"rank", arrays_zip($"fellows.name", $"fellows.age", $"fellows.hobby").cast("array<struct<NAME: string, AGE:string, HOBBY:string>>").as("fellows"))

然后结果的模式为:

root
 |-- group: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- fellows: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- AGE: string (nullable = true)
 |    |    |-- HOBBY: string (nullable = true)

注意:

“爆炸”和“收集”解决方案无法满足我的要求,因为它太复杂了,

或者如果您可以在我的solution2中添加md5 ID生成会很有帮助。

感谢您能给我一些建议。

apache-spark pyspark apache-spark-sql spark-streaming
1个回答
3
投票

IIUC,另一种替代方法应如下-

1。加载数据

   val data =
      """
        |{
        |    "group": "1",
        |    "name": "badboi",
        |    "rank": "3",
        |    "fellows": [
        |        {
        |            "name": "David",
        |            "age": "25",
        |            "hobby": "code"
        |        },
        |        {
        |            "name": "John",
        |            "age": "27",
        |            "hobby": "tennis"
        |        },
        |        {
        |            "name": "Anata",
        |            "age": "23",
        |            "hobby": "dance"
        |        }
        |    ]
        |}
      """.stripMargin

    val df = spark.read.option("multiLine", "true").json(Seq(data).toDS())
    df.show(false)
    df.printSchema()

    /**
      * +-----------------------------------------------------------+-----+------+----+
      * |fellows                                                    |group|name  |rank|
      * +-----------------------------------------------------------+-----+------+----+
      * |[[25, code, David], [27, tennis, John], [23, dance, Anata]]|1    |badboi|3   |
      * +-----------------------------------------------------------+-----+------+----+
      *
      * root
      * |-- fellows: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- age: string (nullable = true)
      * |    |    |-- hobby: string (nullable = true)
      * |    |    |-- name: string (nullable = true)
      * |-- group: string (nullable = true)
      * |-- name: string (nullable = true)
      * |-- rank: string (nullable = true)
      */

2。转换以添加md5(name)作为ID并更改structField的大小写

 val processedDF = df.withColumn("fellows",
      expr("TRANSFORM(fellows, x -> named_struct('ID', md5(to_json(named_struct('ID', x.name))), 'NAME', x.name, 'AGE', x.age, 'HOBBY', x.hobby))"))
    processedDF.show(false)
    processedDF.printSchema()

    /**
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
      * |fellows                                                                                                                                                          |group|name  |rank|
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
      * |[[464e07afc9e46359fb480839150595c5, David, 25, code], [61409aa1fd47d4a5332de23cbf59a36f, John, 27, tennis], [540356fa1779480b07d0743763c78159, Anata, 23, dance]]|1    |badboi|3   |
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
      *
      * root
      * |-- fellows: array (nullable = true)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- ID: string (nullable = true)
      * |    |    |-- NAME: string (nullable = true)
      * |    |    |-- AGE: string (nullable = true)
      * |    |    |-- HOBBY: string (nullable = true)
      * |-- group: string (nullable = true)
      * |-- name: string (nullable = true)
      * |-- rank: string (nullable = true)
      */
    processedDF.toJSON.show(false)

//    {
//      "fellows": [{
//      "ID": "464e07afc9e46359fb480839150595c5",
//      "NAME": "David",
//      "AGE": "25",
//      "HOBBY": "code"
//    }, {
//      "ID": "61409aa1fd47d4a5332de23cbf59a36f",
//      "NAME": "John",
//      "AGE": "27",
//      "HOBBY": "tennis"
//    }, {
//      "ID": "540356fa1779480b07d0743763c78159",
//      "NAME": "Anata",
//      "AGE": "23",
//      "HOBBY": "dance"
//    }],
//      "group": "1",
//      "name": "badboi",
//      "rank": "3"
//    }
© www.soinside.com 2019 - 2024. All rights reserved.