无需中间 pojo 即可将 JsonArray 转换为 Spark 数据集<Row>

问题描述 投票:0回答:1

从 http 调用中,我收到一个具有以下格式的 Json

{
  "name": "foo",
  "version": 1,
  "uploadDate": "2023-04-17",
  "data": [
    {
      "abc": "123",
      "xyz": "",
      "alpha": "4"
    },
     {
      "abc": "456",
      "xyz": "",
      "alpha": "1"
    },
     {
      "abc": "679",
      "xyz": "",
      "alpha": "2"
    },
     {
      "abc": "890",
      "xyz": "",
      "alpha": "5"
    }
  ]
}

我想提取

data
元素,把它变成一个
Dataset<Row>
并保存到镶木地板格式文件中。

如果我调用这个 json 字符串

responseBody
,我将提取
data
元素

  JsonObject responseBodyObject = new JsonParser().parse(responseBody).getAsJsonObject();
  String dataString = new Gson().toJson(responseBodyObject.get("data"));

让字符串再次对象到字符串对我来说已经有点浪费了。我这样做是因为我会把它喂给

spark.read().json(string)
.

我的最终目标是将

data
转换为具有以下结构的 Spark
Dataset<Row>

| abc | xyz | alpha |
---------------------
| 123 |     | 4     |
| 456 |     | 1     |
| 679 |     | 2     |
| 890 |     | 5     |

是否可以在不创建与 json 列表中的对象匹配的 POJO 对象的情况下执行此操作?我可以只使用 JsonElement 或 JsonArray 吗?我宁愿避免为每个 http 端点创建一个新类型(不想每次端点更改结构时都更改该代码,因为那一侧的油漆还没有完全干燥),我将立即写无论如何,我都不会将该数据集转换为镶木地板文件,我不会对这些数据进行任何操作。所以我觉得像 Row 这样的模糊类型就足够了。

我对 spark 不是很熟悉,你会怎么做?

编辑:语法 + 一些关于为什么我不想要 POJO 的额外信息

java apache-spark gson
1个回答
0
投票

所以我试图通过加载 json 数据来解决(scala 语言)

val df = spark.read.format("json").option("multiline","True").load("path")

df.show()

+--------------------+----+----------+-------+
|                data|name|uploadDate|version|
+--------------------+----+----------+-------+
|[[123, 4, ], [456...| foo|2023-04-17|      1|
+--------------------+----+----------+-------+

df.printSchema()
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- abc: string (nullable = true)
 |    |    |-- alpha: string (nullable = true)
 |    |    |-- xyz: string (nullable = true)
 |-- name: string (nullable = true)
 |-- uploadDate: string (nullable = true)
 |-- version: long (nullable = true)

数据将显示为数据集 但是你可以熟悉地爆炸数据。所以

 val df1 = df.withColumn("data",explode($"data")).select("data.*","name","version","uploadDate) 
    df1.show()

输出看起来像这样

+---+-----+---+----+----------+-------+
|abc|alpha|xyz|name|uploadDate|version|
+---+-----+---+----+----------+-------+
|123|    4|   | foo|2023-04-17|      1|
|456|    1|   | foo|2023-04-17|      1|
|679|    2|   | foo|2023-04-17|      1|
|890|    5|   | foo|2023-04-17|      1|
+---+-----+---+----+----------+-------+

如果你不想添加额外的列,那么你可以像这样修改代码

val df1 = df.withColumn("data",explode($"data")).select("data.*") 
    df1.show()

输出将如你所愿

+---+-----+---+
|abc|alpha|xyz|
+---+-----+---+
|123|    4|   |
|456|    1|   |
|679|    2|   |
|890|    5|   |
+---+-----+---+

在java语言中通过

import static org.apache.spark.sql.functions.*;

// Assuming the input DataFrame is named "df"
DataFrame df1 = df.withColumn("data", explode(col("data")))
                  .select("data.*");
df1.show();
© www.soinside.com 2019 - 2024. All rights reserved.