spark Java中多列的聚合

问题描述 投票:0回答:1

我有动态列的列表priceColumns。我想在Dataset中聚合这些列,

public Dataset getAgg(RelationalGroupedDataset rlDataset){
Dataset selectedDS=null;
    for(String priceCol :priceColumns){
            selectedDS=rlDataset.agg(expr("sum(cast("+priceCol+" as BIGINT))"));
        }
return selectedDS;
}

上面的代码是一个不正确的代码,我在这里要做的是,基于每个Columns,聚合应该发生在该数据集中,如何编写通用代码?我完全被困在这里了。

java apache-spark group-by aggregate apache-spark-dataset
1个回答
0
投票

我尝试了下面的方式,它解决了。

    List<Column> columnExpr = priceColumns.stream()
                             .map(col->expr("sum(cast("+col+" as BIGINT))").as(col))
                             .collect(Collectors.toList());

然后,

selectedDS= rlDataset
                    .agg(columnExpr.get(0),
                JavaConverters.asScalaIteratorConverter(columnExpr.subList(1, columnExpr.size())
                    .iterator()).asScala().toSeq());
© www.soinside.com 2019 - 2024. All rights reserved.