我有2个数据框
DF1具有列
A , B , C
a1 , b1 , c1
DF2
D , E , F , G
d1, e1, f1 , g1
我想加入数据集,以便最终的数据集成为
A , B , C , D , E , F , G
a1 , b1 ,c1, null, null, null,null
null, null ,null ,d1, e1, f1, g1
我已经尝试过此代码
Set<String> df1Cols = Arrays
.stream(df1.columns()).collect(Collectors.toSet());
Set<String> df2Cols = Arrays
.stream(df2.columns()).collect(Collectors.toSet());
Set<String> allCols = Stream.concat(df1Cols.stream(), df2Cols.stream())
.collect(Collectors.toSet());
Dataset<Row> df3 = df1.select(
expr(df1Cols,allCols)
).union(
df2.select(
expr(df2Cols,allCols)
)
);
private String expr(Set<String> myCols, Set<String> allCols) {
List<String> clist = new ArrayList<>();
for(String x : allCols) {
if (myCols.contains(x)) {
clist.add(x.toString());
} else {
clist.add(lit(null).as(x.toString()).toString());
}
}
String res = String.join(",", clist).replace("`","").replace("`","");
System.out.println("XXXXXX"+res);
return res;
}
这似乎不起作用,因为我无法列出各列,我是否遗漏了任何东西或在Java中执行此操作的标准模式是什么,所以我遇到了问题?
根据您的代码段,expr函数将返回所有列都为String的字符串,就好像可用,否则返回Null。
但是select函数不将列作为一个String接受。您必须返回List,然后将其转换为Seq。
这里是实现您要执行的操作的代码段:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConversions;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.Column;
private static List<Column> expr(Set<String> myCols, Set<String> allCols) {
List<Column> clist = new ArrayList<>();
for (String x : allCols) {
if (myCols.contains(x)) {
clist.add(new Column(x));
} else {
clist.add(lit(null).as(x.toString()));
}
}
return clist;
}
使用从您的代码段中获取的测试代码来调用此示例:
StructField[] structFieldsFirst = new StructField[3];
structFieldsFirst[0] = new StructField("A", DataTypes.StringType, true, Metadata.empty());
structFieldsFirst[1] = new StructField("B", DataTypes.StringType, true, Metadata.empty());
structFieldsFirst[2] = new StructField("C", DataTypes.StringType, true, Metadata.empty());
StructType firstSchema = new StructType(structFieldsFirst);
List<Row> firstRows = new ArrayList<>();
Object[] objectTest = new Object[] { "a1", "b1", "c1" };
firstRows.add(RowFactory.create(objectTest));
Dataset<Row> first = spark.createDataFrame(firstRows, firstSchema);
first.show(10, false);
StructField[] structFieldsSecond = new StructField[4];
structFieldsSecond[0] = new StructField("D", DataTypes.StringType, true, Metadata.empty());
structFieldsSecond[1] = new StructField("E", DataTypes.StringType, true, Metadata.empty());
structFieldsSecond[2] = new StructField("F", DataTypes.StringType, true, Metadata.empty());
structFieldsSecond[3] = new StructField("G", DataTypes.StringType, true, Metadata.empty());
StructType secondSchema = new StructType(structFieldsSecond);
List<Row> secondRows = new ArrayList<>();
Object[] objectSecond = new Object[] { "d1", "e1", "f1", "g1" };
secondRows.add(RowFactory.create(objectSecond));
Dataset<Row> second = spark.createDataFrame(secondRows, secondSchema);
second.show(10, false);
Set<String> firstCols = Arrays.stream(first.columns()).collect(Collectors.toSet());
Set<String> secondCols = Arrays.stream(second.columns()).collect(Collectors.toSet());
Set<String> allCols = Stream.concat(firstCols.stream(), secondCols.stream()).collect(Collectors.toSet());
List<Column> firstDFColumns = expr(firstCols, allCols);
List<Column> secondDFColumns = expr(secondCols, allCols);
Dataset<Row> firstDFAllColumns = first.select(JavaConversions.asScalaBuffer(firstDFColumns).seq());
Dataset<Row> secondDFAllColumns = second.select(JavaConversions.asScalaBuffer(secondDFColumns).seq());
Dataset<Row> mergedDF = firstDFAllColumns.union(secondDFAllColumns);
mergedDF.show(10, false);
这将返回以下输出:
+---+---+---+
|A |B |C |
+---+---+---+
|a1 |b1 |c1 |
+---+---+---+
+---+---+---+---+
|D |E |F |G |
+---+---+---+---+
|d1 |e1 |f1 |g1 |
+---+---+---+---+
+----+----+----+----+----+----+----+
|A |B |C |D |E |F |G |
+----+----+----+----+----+----+----+
|a1 |b1 |c1 |null|null|null|null|
|null|null|null|d1 |e1 |f1 |g1 |
+----+----+----+----+----+----+----+