展平嵌套的Spark Dataframe

问题描述 投票:6回答:3

有没有办法压缩任意嵌套的Spark Dataframe?我所看到的大部分工作都是针对特定的模式编写的,我希望能够通过不同的嵌套类型(例如StructType,ArrayType,MapType等)来泛化一个Dataframe。

假设我有一个类似的架构:

StructType(List(StructField(field1,...), StructField(field2,...), ArrayType(StructType(List(StructField(nested_field1,...), StructField(nested_field2,...)),nested_array,...)))

希望将其调整为具有如下结构的平台:

field1
field2
nested_array.nested_field1
nested_array.nested_field2

仅供参考,寻找Pyspark的建议,但其他风味的Spark也值得赞赏。

apache-spark pyspark spark-dataframe
3个回答
13
投票

这个问题可能有点旧,但对于那些仍在寻找解决方案的人来说,你可以使用select *来内联复杂的数据类型:

首先让我们创建嵌套的数据帧:

from pyspark.sql import HiveContext
hc = HiveContext(sc)
nested_df = hc.read.json(sc.parallelize(["""
{
  "field1": 1, 
  "field2": 2, 
  "nested_array":{
     "nested_field1": 3,
     "nested_field2": 4
  }
}
"""]))

现在要压扁它:

flat_df = nested_df.select("field1", "field2", "nested_array.*")

你会在这里找到有用的例子:https://docs.databricks.com/delta/data-transformation/complex-types.html

如果嵌套数组太多,可以使用:

flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(*flat_cols, *[c + ".*" for c in nested_cols])

1
投票

这是我最后的方法:

1)将数据帧中的行映射到dict的rdd。在线找到合适的python代码来展平字典。

flat_rdd = nested_df.map(lambda x : flatten(x))

哪里

def flatten(x):
  x_dict = x.asDict()
  ...some flattening code...
  return x_dict

2)将RDD [dict]转换回数据帧

flat_df = sqlContext.createDataFrame(flat_rdd)

-1
投票

以下要点将展平嵌套json的结构,

import typing as T

import cytoolz.curried as tz
import pyspark


def schema_to_columns(schema: pyspark.sql.types.StructType) -> T.List[T.List[str]]:
    """
    Produce a flat list of column specs from a possibly nested DataFrame schema
    """

    columns = list()

    def helper(schm: pyspark.sql.types.StructType, prefix: list = None):

        if prefix is None:
            prefix = list()

        for item in schm.fields:
            if isinstance(item.dataType, pyspark.sql.types.StructType):
                helper(item.dataType, prefix + [item.name])
            else:
                columns.append(prefix + [item.name])

    helper(schema)

    return columns

def flatten_frame(frame: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:

    aliased_columns = list()

    for col_spec in schema_to_columns(frame.schema):
        c = tz.get_in(col_spec, frame)
        if len(col_spec) == 1:
            aliased_columns.append(c)
        else:
            aliased_columns.append(c.alias(':'.join(col_spec)))

    return frame.select(aliased_columns)

然后,您可以将嵌套数据展平为

flatten_data = flatten_frame(nested_df)

这将为您提供展平的数据框。

要点取自https://gist.github.com/DGrady/b7e7ff3a80d7ee16b168eb84603f5599

© www.soinside.com 2019 - 2024. All rights reserved.