如何按类别在spark [duplicate]中按指定的数量进行抽样

问题描述 投票:-1回答:2

这个问题在这里已有答案:

我有一个DataFrame,其中包含n属性(列)。 DataFrame的总数可能每天都有所不同(增加)。假设我有一个category列,它有四种类型 - typea,typeb,typec和typed。

我想做的数据取样取决于类别。

让我们考虑一下DataFrame中有10k行,我决定从DF获得100行。

那么下面应该是进行抽样的逻辑。

100(最终行)/ 4(不同类别类型)= 25(每类别);

最终的DataFrame应该只包含100行,每个类别只应包含25行。

apache-spark apache-spark-sql pyspark-sql apache-spark-ml
2个回答
-1
投票

我们首先按类别列对行进行分组,然后从每个类别中选择25行。

Java版本:

    Dataset<Row> df = sparkSession.read().json("things.json");
    Map<Object, List<Row>> rowsPerCategory = df.javaRDD()
            .groupBy(row -> row.getAs("category"))
            .mapToPair(rowsByCategory -> {
                List<Row> sampleRows = new LinkedList<>();
                Iterator<Row> rows = rowsByCategory._2().iterator();
                for (int i = 0; i < 25; i++)
                    if (rows.hasNext())
                        sampleRows.add(rows.next());
                return new Tuple2<>(rowsByCategory._1(), sampleRows);
            }).collectAsMap();

-1
投票

您可以尝试将其作为通用解决方案。

var totalSize = 100;
var categories = df.select($"category").distinct.collect
val desiredDf = categories.map(_.get(0)).map(cat => df.filter($"category" === cat)).map(dff => dff.limit(totalSize/categories)).reduce((df1,df2) => df1.union(df2))
© www.soinside.com 2019 - 2024. All rights reserved.