如何 .collect_list() 而无需在 scala Spark 中键入每个列名称

问题描述 投票:0回答:1

我想使用

.collect_list()
,而不必键入/粘贴每个列标题作为输入,因为我的数据有太多列标题,无法简单地键入或粘贴。如何输入多个列标题作为该函数的输入?我的数据非常庞大:550 亿行、2300 列,因此如果可能的话,我希望有适合大数据的解决方案。

示例:

import org.apache.spark.sql.functions._

val Enormous_df = Seq(("Apples", "a", "b"),
                      ("Apples", "a", "d"),
                      ("Banana", "e", "f"))
                      .toDF("Grouping_by_this_Column", "One_of_Thousands_of_Columns", "Second_of_Thousands_of_Columns")

Enormous_df.show()
+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
|                 Apples|                          a|                             b|
|                 Apples|                          a|                             d|
|                 Banana|                          e|                             f|
+-----------------------+---------------------------+------------------------------+

继续举例:

//Creating a sequence of the column titles I desire. In this case, it's all of them.
val All_Thousands_of_Column_Titles = Seq(Enormous_df.drop("Grouping_by_this_Column").columns)

//This doesn't work. How do I do this?
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))

我得到的错误是:

command-33353371:11: error: overloaded method value collect_list with alternatives:
  (columnName: String)org.apache.spark.sql.Column <and>
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (Seq[Array[String]])
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))

我想要做的 df 是:

+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
|                 Apples|                     [a, a]|                        [b, e]|
|                 Banana|                        [d]|                           [f]|
+-----------------------+---------------------------+------------------------------+

在 pyspark 中,我想要做的成功等效是:

import pyspark.pandas as ps
Enormous_df=ps.DataFrame({"Grouping_by_this_Column":["Apples","Apples","Banana"],
                          "One_of_Thousands_of_Columns":["a","a","d"],
                          "Second_of_Thousands_of_Columns":["b","e","f"]})\
              .to_spark()

The_df_I_Want= Enormous_df.groupby("Grouping_by_this_Column").agg({Column_Title:"collect_list" \
                                                                   for Column_Title in Enormous_df.drop("Grouping_by_this_Column")\
                                                                                                  .columns})\
                          .withColumnRenamed("collect_list(One_of_Thousands_of_Columns)","One_of_Thousands_of_Columns")\
                          .withColumnRenamed("collect_list(Second_of_Thousands_of_Columns)","Second_of_Thousands_of_Columns")

The_df_I_Want.show()
scala apache-spark
1个回答
0
投票

如果我没猜错的话,你想要的不是

agg(collect_list(colA, colB, ...))
(不可能)而是
agg(collect_list(colA), collect_list(colB), ...)

您可以首先从

collect_list
构建
All_Thousands_of_Column_Titles
列表,然后将其传递给
agg
:

val aggregations = All_Thousands_of_Column_Titles.map(col => collect_list(col))

Enormous_Df.groupBy(...).agg(aggregations:_*)

我还没有测试过,但这就是想法。

© www.soinside.com 2019 - 2024. All rights reserved.