有什么方法可以将Spark数据帧转换为Dataset[Map[String,Any]]
,以便一旦将其转换为Map,就可以在该行上执行Map Side工作。该文件的架构主要是不稳定的,因此实际上不可能在编译时使用dataframe.as[MyClass]
之类的乘积编码器来创建案例类。
这里的复杂性是,数据可以嵌套,并且可以在其中包含Maps和Lists。
以Json表示的示例数据
:{ "field1": "Sally", "field2": "Green", "field3": 27, "subObject": { "subField": "Value" }, "fieldArray": ["A","B","C"], "accounting": [ { "firstName": "John", "lastName": "Doe", "nestedSubField": { "x": "y" }, "age": [11,2,33] }, { "firstName": "Mary", "lastName": "Smith", "age": [11,2,33] } ], "sales": [ { "firstName": "Sally", "lastName": "Green", "age": 27 }, { "firstName": "Jim", "lastName": "Galley", "age": 41 } ] }
[当此数据加载到数据框时,我们获得该数据框的以下架构。
Dataframe Schema
root |-- accounting: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- age: array (nullable = true) | | | |-- element: long (containsNull = true) | | |-- firstName: string (nullable = true) | | |-- lastName: string (nullable = true) | | |-- nestedSubField: struct (nullable = true) | | | |-- x: string (nullable = true) |-- field1: string (nullable = true) |-- field2: string (nullable = true) |-- field3: long (nullable = true) |-- fieldArray: array (nullable = true) | |-- element: string (containsNull = true) |-- sales: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- age: long (nullable = true) | | |-- firstName: string (nullable = true) | | |-- lastName: string (nullable = true) |-- subObject: struct (nullable = true) | |-- subField: string (nullable = true)
有什么方法可以将该数据帧转换为Map [String,Any],其外观如下。 格式化了一下。
Map( accounting -> List( Map( firstName -> John, lastName -> Doe, nestedSubField -> Map(x -> y), age -> List(11, 2, 33) ), Map( firstName -> Mary, lastName -> Smith, age -> List(11, 2, 33) ) ), fieldArray -> List(A, B, C), subObject -> Map(subField -> Value), field1 -> Sally, field3 -> 27, sales -> List( Map( firstName -> Sally, lastName -> Green, age -> 27 ), Map( firstName -> Jim, lastName -> Galley, age -> 41) ), field2 -> Green )
目前,我正在实现以下目标。 JsonUtil是Jackson API的包装器]
val dataframeAsJsonDataset:Dataset[String] = dataframe.toJSON val result:Dataset[Map[String,Any]] = dataframeAsJsonDataset.map(each=>JsonUtils.fromJson(each,classOf[Map[String,Any]]))
上述方法确实很糟糕,效果也很差。关于此的任何建议将非常有帮助。
[有什么方法可以将Spark数据框转换为数据集[Map [String,Any]],以便一旦将其转换为Map后,就可以在该行上执行地图端作业操作。文件的模式主要是易变的...
如果您的方案不断发展,则可能是数据质量问题。这要求使用spark-records之类的东西通过适当的错误检查来自定义分析数据。