火花无法爆炸列

问题描述 投票:0回答:1

对于给定的JSON响应:

{
    "id": "1575972348068_1649088229",
    "results": [
        {
            "rows_count": 53,
            "runtime_seconds": 0.004000000189989805,
            "columns": [
                "ROLE_ID",
                "ROLE_NAME"
            ],
            "columns_type": [
                "number",
                "string"
            ],
            "limit": 2000000000,
            "index": 0,
            "rows": [
                [
                    "6",
                    "Incentive Plan Advisor                                                                              "
                ],
                [
                    "7",
                    "Security Admin                                                                                      "
                ]
            ],
            "command": "<an sql command>"
        }
    ],
    "status": "completed"
}

我想在此JSON中获取rows作为Spark Dataframe。为此,我正在尝试使用以下方法explode results条目:

response.show()
val flattened = response.select($"results", explode($"results").as("results_flat1")).select($"results_flat1")
        flattened.show()

我收到此回复:

+--------------------+--------------------+---------+
|                  id|             results|   status|
+--------------------+--------------------+---------+
|1575972687102_374...|[[[ROLE_ID, ROLE_...|completed|
+--------------------+--------------------+---------+

+--------------------+
|       results_flat1|
+--------------------+
|[[ROLE_ID, ROLE_N...|
+--------------------+

[尝试执行explode时,出现此错误:

flattened.select($"results_flat1", explode($"results_flat1").as("results_flat2"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`results_flat1`)' due to data type mismatch: input to function explode should be array or map type, not struct<columns:array<string>,columns_type:array<string>,command:string,index:bigint,limit:bigint,rows:array<array<string>>,rows_count:bigint,runtime_seconds:double>;;
'Project [results_flat1#91, explode(results_flat1#91) AS results_flat2#99]
+- Project [results_flat1#91]
   +- Project [results#75, results_flat1#91]
      +- Generate explode(results#75), false, [results_flat1#91]
         +- LogicalRDD [id#74, results#75, status#76], false

根据我的分析,可以看到要进行爆炸,我们需要一个字符串或字符串数​​组才能进行爆炸。为此,我尝试了:

val x = spark.read.json(Seq(flattened.first().get(0).asInstanceOf[String]).toDS())
x.show()

尝试此操作,会出现另一个错误:

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
    at org.apache.spark.sql.Row$class.getString(Row.scala:255)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
    at com.ibm.cmdwcloud.operations.SelectOperations.getRoleListFromEntitlement(SelectOperations.scala:23)
    at com.ibm.cmdwcloud.Main$.main(Main.scala:22)
    at com.ibm.cmdwcloud.Main.main(Main.scala)

我不知道有什么方法可以帮助我直接获取行对象并将其转换为DataFrame。请对此提供帮助。

编辑:

虽然我能够看到此架构:

root
 |-- results_flat1: struct (nullable = true)
 |    |-- columns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- columns_type: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- command: string (nullable = true)
 |    |-- index: long (nullable = true)
 |    |-- limit: long (nullable = true)
 |    |-- rows: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- rows_count: long (nullable = true)
 |    |-- runtime_seconds: double (nullable = true)

但无法对此爆炸。.>

编辑2:

感谢下面的回复,我离我要达到的目标越来越近。我执行了此操作:

val flattened = response.select($"results", explode($"results").as("results_flat1"))
            .select("results_flat1.*")
            .select($"rows", explode($"rows").as("rows_flat"))
            .select($"rows_flat")

flattened.show()

并获得此输出:

+--------------------+
|           rows_flat|
+--------------------+
|[6, Incentive Pla...|
|[7, Security Admi...|
+--------------------+

现在是否有可能将其爆炸并将其进一步映射到架构,以便获得类似的内容:

+--------------------+--------------------+
|             role_id|           role_name|
+--------------------+--------------------+
|                   6| Incentive Plan Ad..|
|                   7|      Security Admin|
+--------------------+--------------------+

对于给定的JSON响应:{“ id”:“ 1575972348068_1649088229”,“结果”:[{“ rows_count”:53,“ runtime_seconds”:0.004000000189989805,“ columns” ...

scala apache-spark apache-spark-sql apache-spark-dataset
1个回答
2
投票
这合适吗?
© www.soinside.com 2019 - 2024. All rights reserved.