将Spark数据帧[行]转换为Map [String,Any]

问题描述 投票:1回答:1

有什么方法可以将Spark数据帧转换为Dataset[Map[String,Any]],以便一旦将其转换为Map,就可以在该行上执行Map Side工作。该文件的架构主要是不稳定的,因此实际上不可能在编译时使用dataframe.as[MyClass]之类的乘积编码器来创建案例类。

这里的复杂性是,数据可以嵌套,并且可以在其中包含Maps和Lists。

以Json表示的示例数据

{
    "field1": "Sally",
    "field2": "Green",
    "field3": 27,
    "subObject": {
        "subField": "Value"
    },
    "fieldArray": ["A","B","C"],
    "accounting": [
        {
            "firstName": "John",
            "lastName": "Doe",
            "nestedSubField": {
                "x": "y"
            },
            "age": [11,2,33]
        },
        {
            "firstName": "Mary",
            "lastName": "Smith",
            "age": [11,2,33]
        }
    ],
    "sales": [
        {
            "firstName": "Sally",
            "lastName": "Green",
            "age": 27
        },
        {
            "firstName": "Jim",
            "lastName": "Galley",
            "age": 41
        }
    ]
}

[当此数据加载到数据框时,我们获得该数据框的以下架构。

Dataframe Schema

  root
     |-- accounting: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- age: array (nullable = true)
     |    |    |    |-- element: long (containsNull = true)
     |    |    |-- firstName: string (nullable = true)
     |    |    |-- lastName: string (nullable = true)
     |    |    |-- nestedSubField: struct (nullable = true)
     |    |    |    |-- x: string (nullable = true)
     |-- field1: string (nullable = true)
     |-- field2: string (nullable = true)
     |-- field3: long (nullable = true)
     |-- fieldArray: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- sales: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- age: long (nullable = true)
     |    |    |-- firstName: string (nullable = true)
     |    |    |-- lastName: string (nullable = true)
     |-- subObject: struct (nullable = true)
     |    |-- subField: string (nullable = true)

有什么方法可以将该数据帧转换为Map [String,Any],其外观如下。 格式化了一下。

Map(
    accounting -> List(
            Map(
                firstName -> John, 
                lastName -> Doe, 
                nestedSubField -> Map(x -> y), 
                age -> List(11, 2, 33)
            ),
            Map(
                firstName -> Mary, 
                lastName -> Smith, 
                age -> List(11, 2, 33)
            )
        ),
    fieldArray -> List(A, B, C),
    subObject -> Map(subField -> Value), 
    field1 -> Sally, 
    field3 -> 27, 
    sales -> List(
            Map(
                firstName -> Sally, 
                lastName -> Green, 
                age -> 27
                ), 
            Map(
                firstName -> Jim, 
                lastName -> Galley, 
                age -> 41)
                ), 
    field2 -> Green
)

目前,我正在实现以下目标。 JsonUtil是Jackson API的包装器]

val dataframeAsJsonDataset:Dataset[String] = dataframe.toJSON
val result:Dataset[Map[String,Any]] = dataframeAsJsonDataset.map(each=>JsonUtils.fromJson(each,classOf[Map[String,Any]]))

上述方法确实很糟糕,效果也很差。关于此的任何建议将非常有帮助。

[有什么方法可以将Spark数据框转换为数据集[Map [String,Any]],以便一旦将其转换为Map后,就可以在该行上执行地图端作业操作。文件的模式主要是易变的...

scala apache-spark apache-spark-sql databricks apache-spark-dataset
1个回答
0
投票

如果您的方案不断发展,则可能是数据质量问题。这要求使用spark-records之类的东西通过适当的错误检查来自定义分析数据。

© www.soinside.com 2019 - 2024. All rights reserved.