这个问题在这里已有答案:
我有一个DataFrame
,其中包含n
属性(列)。 DataFrame
的总数可能每天都有所不同(增加)。假设我有一个category
列,它有四种类型 - typea,typeb,typec和typed。
我想做的数据取样取决于类别。
让我们考虑一下DataFrame
中有10k行,我决定从DF获得100行。
那么下面应该是进行抽样的逻辑。
100(最终行)/ 4(不同类别类型)= 25(每类别);
最终的DataFrame
应该只包含100行,每个类别只应包含25行。
我们首先按类别列对行进行分组,然后从每个类别中选择25行。
Java版本:
Dataset<Row> df = sparkSession.read().json("things.json");
Map<Object, List<Row>> rowsPerCategory = df.javaRDD()
.groupBy(row -> row.getAs("category"))
.mapToPair(rowsByCategory -> {
List<Row> sampleRows = new LinkedList<>();
Iterator<Row> rows = rowsByCategory._2().iterator();
for (int i = 0; i < 25; i++)
if (rows.hasNext())
sampleRows.add(rows.next());
return new Tuple2<>(rowsByCategory._1(), sampleRows);
}).collectAsMap();
您可以尝试将其作为通用解决方案。
var totalSize = 100;
var categories = df.select($"category").distinct.collect
val desiredDf = categories.map(_.get(0)).map(cat => df.filter($"category" === cat)).map(dff => dff.limit(totalSize/categories)).reduce((df1,df2) => df1.union(df2))