[我正在使用Java处理火花代码,在加入条件之后,由于不同来源中的重复IDs
,我们获得了多条记录,[ID
是重复项,但某些属性已更改),因此具有相同ID的我们将拥有多条记录。我需要的是将每个ID
输入数据集
+---+---+---+----+---+---+
|id |b |c |d |f |g |
+---+---+---+----+---+---+
|1 |e |dd |ddd |34 |r5t|
|1 |e |dd2|ddd |34 |r5t|
|1 |e |dd3|ddd |34 |rt |
|2 |e |dd |ddd1|34 |5rt|
|4 |e |dd |ddd1|34 |rt |
|1 |e |dd4|ddd |34 |rt |
|4 |e |dd4|ddd |34 |rt |
|4 |e |dd4|ddd |3 |rt |
|2 |e |dd |ddd |3 |r5t|
|2 |e |dd |ddd |334|rt |
+---+---+---+----+---+---+
预期输出
+---+--------------+--------------+--------------+-------------------+--------------+
|id |f | b | g|d |d |
+---+--------------+--------------+--------------+-------------------+--------------+
|1 |[34] |[e] |[r5t, rt] |[dd4, dd3, dd2, dd]|[ddd] |
+---+--------------+--------------+--------------+-------------------+--------------+
我尝试如下明确给出collect_list
df.groupBy("id").agg(
functions.collect_set("f"),
functions.collect_set("b")
).show(1,false);
但是我的情况是我们有300列,在数据集中,有些列可能也不相同,有时会有所变化。
org.apache.spark.sql
程序包中,有agg(exprs: Map[String, String])
方法接受Map<String,String>
,其中key
是列名,值是sql.functions。Dataset<Row> df = spark.read().format("csv").option("header", "true")
.load("...");
Map<String,String> collect_MAP = Arrays.stream(df.columns())
.filter(f -> !f.equals("id"))
.collect(Collectors.toMap(f -> f,f -> "collect_set"));
df.groupBy("id").agg(collect_MAP).show(false);
结果
+---+--------------+--------------+--------------+-------------------+--------------+ |id |collect_set(f)|collect_set(b)|collect_set(g)|collect_set(c) |collect_set(d)| +---+--------------+--------------+--------------+-------------------+--------------+ |1 |[34] |[e] |[r5t, rt] |[dd4, dd3, dd2, dd]|[ddd] | |4 |[3, 34] |[e] |[rt] |[dd4, dd] |[ddd1, ddd] | |2 |[334, 3, 34] |[e] |[r5t, rt, 5rt]|[dd] |[ddd1, ddd] | +---+--------------+--------------+--------------+-------------------+--------------+