Spark / Scala在不同的数据子集上使用相同的函数重复创建DataFrame

问题描述 投票:1回答:1

我当前的代码使用相同的函数重复创建新的DataFrames(df_1,df_2,df_3),但应用于原始DataFrame df的不同子集(例如where(“category == 1'))。

我想创建一个可以自动创建这些DataFrame的函数。

在以下示例中,My DataFrame df有三列:“category”,“id”和“amount”。假设我有10个类别。我想总结列'类别'的值,并根据不同的类别计算'类别'的出现次数:

val df_1 = df.where("category == 1")
.groupBy("id")
.agg(sum(when(col("amount") > 0,(col("amount")))).alias("total_incoming_cat_1"),
count(when(col("amount") < 0, (col("amount")))).alias("total_outgoing_cat_1"))

val df_2 = df.where("category == 2")
.groupBy("id")
.agg(sum(when(col("amount") > 0,(col("amount")))).alias("total_incoming_cat_2"),
count(when(col("amount") < 0, (col("amount")))).alias("total_outgoing_cat_2"))

val df_3 = df.where("category == 3")
.groupBy("id")
.agg(sum(when(col("amount") > 0, (col("amount")))).alias("total_incoming_cat_3"),
count(when(col("amount") < 0, (col("amount")))).alias("total_outgoing_cat_3"))

我想要这样的东西:

def new_dfs(L:List, df:DataFrame): DataFrame={
  for l in L{
    val df_+l df.filter($amount == l)
    .groupBy("id")
    .agg(sum(when(col("amount") > 0, (col("amount")))).alias("total_incoming_cat_"+l),
    count(when(col("amount") < 0, (col("amount")))).alias("total_outgoing_cat_"+l))
  }
}
scala apache-spark
1个回答
1
投票

按类别和ID分组并不是更好

df
.groupBy("category","id")
.agg(sum(when(col("amount") > 0,(col("amount")))).alias("total_incoming_cat"),
count(when(col("amount") < 0, (col("amount")))).alias("total_outgoing_cat"))
© www.soinside.com 2019 - 2024. All rights reserved.