Spark-scala聚合列表中的多个列[重复]

问题描述 投票:0回答:2

这个问题在这里已有答案:

我有一个数据框,其中包含几个未修复的数字列(它们可以在每次执行期间更改)。假设我有一个带有数字列名称的Seq对象。我想为每个列应用聚合函数。我尝试过以下方法:

println(numeric_cols)
// -> Seq[String] = List(avgTkts_P1, avgTkts_P2, avgTkts_P3, avgTkts_P4)

var sum_ops = for (c <- numeric_cols) yield org.apache.spark.sql.functions.sum(c).as(c)

var result = df.groupBy($"ID").agg( sum_ops:_* )

但它给了我以下错误:

scala> var avgTktsPerPeriodo = df.groupBy("ID").agg(sum_ops:_*)
<console>:79: error: overloaded method value agg with alternatives:
  (expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame <and>
  (exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame <and>
  (exprs: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame <and>
  (aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.Column)

不知道这是否可以在spark-scala中使用?

scala apache-spark aggregate
2个回答
1
投票

如果你看一个签名:

(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame

第一个参数是Column表达式,第二个参数是varargs。

你需要做一些事情:

val result = df.groupBy($"ID").agg( sum_ops.head, sum_ops.tail:_* )

0
投票

好的找到了解决方案(Spark中的agg函数接受Map [colname - > operation]):

var agg_ops =  numeric_cols map (c => c -> "sum") toMap

var result = df.groupBy($"ID").agg( agg_ops )
© www.soinside.com 2019 - 2024. All rights reserved.