在Java中的Spark中加入两个具有不同列的数据集

问题描述 投票:-1回答:1

我有2个数据框

DF1具有列

A , B , C
a1 , b1 , c1

DF2

D , E , F , G
d1, e1, f1 , g1

我想加入数据集,以便最终的数据集成为

A  ,  B  , C ,  D  ,  E ,   F   , G
a1  , b1  ,c1,  null, null, null,null
null, null ,null ,d1, e1,  f1,  g1

我已经尝试过此代码

 Set<String> df1Cols = Arrays
          .stream(df1.columns()).collect(Collectors.toSet());

      Set<String> df2Cols =  Arrays
          .stream(df2.columns()).collect(Collectors.toSet());

      Set<String> allCols = Stream.concat(df1Cols.stream(), df2Cols.stream())
          .collect(Collectors.toSet());


Dataset<Row> df3 = df1.select(
          expr(df1Cols,allCols)
        ).union(
            df2.select(
          expr(df2Cols,allCols)
            )
      );


  private String expr(Set<String> myCols, Set<String> allCols) {
    List<String> clist = new ArrayList<>();
    for(String x : allCols) {
      if (myCols.contains(x)) {
        clist.add(x.toString());
      } else {
        clist.add(lit(null).as(x.toString()).toString());
      }
    }
    String res = String.join(",", clist).replace("`","").replace("`","");
    System.out.println("XXXXXX"+res);
    return  res;
  }

这似乎不起作用,因为我无法列出各列,我是否遗漏了任何东西或在Java中执行此操作的标准模式是什么,所以我遇到了问题?

java apache-spark
1个回答
0
投票

根据您的代码段,expr函数将返回所有列都为String的字符串,就好像可用,否则返回Null。

但是select函数不将列作为一个String接受。您必须返回List,然后将其转换为Seq。

这里是实现您要执行的操作的代码段:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConversions;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.Column;
private static List<Column> expr(Set<String> myCols, Set<String> allCols) {
        List<Column> clist = new ArrayList<>();
        for (String x : allCols) {
            if (myCols.contains(x)) {
                clist.add(new Column(x));
            } else {
                clist.add(lit(null).as(x.toString()));
            }
        }
        return clist;
    }

使用从您的代码段中获取的测试代码来调用此示例:

    StructField[] structFieldsFirst = new StructField[3];

    structFieldsFirst[0] = new StructField("A", DataTypes.StringType, true, Metadata.empty());
    structFieldsFirst[1] = new StructField("B", DataTypes.StringType, true, Metadata.empty());
    structFieldsFirst[2] = new StructField("C", DataTypes.StringType, true, Metadata.empty());
    StructType firstSchema = new StructType(structFieldsFirst);

    List<Row> firstRows = new ArrayList<>();
    Object[] objectTest = new Object[] { "a1", "b1", "c1" };
    firstRows.add(RowFactory.create(objectTest));

    Dataset<Row> first = spark.createDataFrame(firstRows, firstSchema);
    first.show(10, false);

    StructField[] structFieldsSecond = new StructField[4];
    structFieldsSecond[0] = new StructField("D", DataTypes.StringType, true, Metadata.empty());
    structFieldsSecond[1] = new StructField("E", DataTypes.StringType, true, Metadata.empty());
    structFieldsSecond[2] = new StructField("F", DataTypes.StringType, true, Metadata.empty());
    structFieldsSecond[3] = new StructField("G", DataTypes.StringType, true, Metadata.empty());

    StructType secondSchema = new StructType(structFieldsSecond);

    List<Row> secondRows = new ArrayList<>();
    Object[] objectSecond = new Object[] { "d1", "e1", "f1", "g1" };
    secondRows.add(RowFactory.create(objectSecond));
    Dataset<Row> second = spark.createDataFrame(secondRows, secondSchema);
    second.show(10, false);

    Set<String> firstCols = Arrays.stream(first.columns()).collect(Collectors.toSet());

    Set<String> secondCols = Arrays.stream(second.columns()).collect(Collectors.toSet());

    Set<String> allCols = Stream.concat(firstCols.stream(), secondCols.stream()).collect(Collectors.toSet());

    List<Column> firstDFColumns = expr(firstCols, allCols);

    List<Column> secondDFColumns = expr(secondCols, allCols);
    Dataset<Row> firstDFAllColumns = first.select(JavaConversions.asScalaBuffer(firstDFColumns).seq());

    Dataset<Row> secondDFAllColumns = second.select(JavaConversions.asScalaBuffer(secondDFColumns).seq());

    Dataset<Row> mergedDF = firstDFAllColumns.union(secondDFAllColumns);

    mergedDF.show(10, false);

这将返回以下输出:

+---+---+---+
|A  |B  |C  |
+---+---+---+
|a1 |b1 |c1 |
+---+---+---+

+---+---+---+---+
|D  |E  |F  |G  |
+---+---+---+---+
|d1 |e1 |f1 |g1 |
+---+---+---+---+


+----+----+----+----+----+----+----+
|A   |B   |C   |D   |E   |F   |G   |
+----+----+----+----+----+----+----+
|a1  |b1  |c1  |null|null|null|null|
|null|null|null|d1  |e1  |f1  |g1  |
+----+----+----+----+----+----+----+
© www.soinside.com 2019 - 2024. All rights reserved.