Spark展平数据集映射的列

问题描述 投票:0回答:1

我有带模式的RDD-

Schema: {
  "type" : "struct",
  "fields" : [ 

 {
    "name" : "cola",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "mappedcol",
    "type" : {
      "type" : "map",
      "keyType" : "string",
      "valueType" : "string",
      "valueContainsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "colc",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }]
  }

样本值:

{
cola : A1,
mappedcol : { mapped1: M1, mapped2: M2, mapped3: M3  }
colc : C1
}

我想将mapdcols中的键上移一级。基本上将所有列平整到一个级别。

cola, mapped1, mapped2, mapped3, colc
A1, M1,M2,M3, C1

在Java中有一种优雅的方法吗?

java apache-spark rdd
1个回答
0
投票

可以使用点语法来访问嵌套结构的单个元素,例如select mappedcol.mapped1将返回M1。这个想法是使用这种点语法将模式转换为列名列表:

private static List<String> structToColNames(StructField[] fields, String prefix) {
    List<String> columns = new ArrayList<>();
    for( StructField field: fields) {
        String fieldname = field.name();
        if( field.dataType() instanceof StructType) {
            columns.addAll(
                structToColNames(((StructType)field.dataType()).fields(), 
                    prefix + fieldname + "."));
        }
        else {
            columns.add(prefix + fieldname);
        }
    }
    return columns;
}

此功能的结果可用于选择数据:

Dataset<Row> df = spark.read().json(<path to json>);
StructField[] fields = df.schema().fields();
List<String> colNames = structToColNames(fields, "");
System.out.println(colNames);
Column[] columns = colNames.stream().map(s -> col(s)).toArray(Column[]::new);
df.select(columns).show();

打印

[cola, colc, mappedcol.mapped1, mappedcol.mapped2, mappedcol.mapped3]

+----+----+-------+-------+-------+
|cola|colc|mapped1|mapped2|mapped3|
+----+----+-------+-------+-------+
|  A1|  C1|     M1|     M2|     M3|
+----+----+-------+-------+-------+
© www.soinside.com 2019 - 2024. All rights reserved.