我想使用
.collect_list()
,而不必键入/粘贴每个列标题作为输入,因为我的数据有太多列标题,无法简单地键入或粘贴。如何输入多个列标题作为该函数的输入?我的数据非常庞大:550 亿行、2300 列,因此如果可能的话,我希望有适合大数据的解决方案。
示例:
import org.apache.spark.sql.functions._
val Enormous_df = Seq(("Apples", "a", "b"),
("Apples", "a", "d"),
("Banana", "e", "f"))
.toDF("Grouping_by_this_Column", "One_of_Thousands_of_Columns", "Second_of_Thousands_of_Columns")
Enormous_df.show()
+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
| Apples| a| b|
| Apples| a| d|
| Banana| e| f|
+-----------------------+---------------------------+------------------------------+
继续举例:
//Creating a sequence of the column titles I desire. In this case, it's all of them.
val All_Thousands_of_Column_Titles = Seq(Enormous_df.drop("Grouping_by_this_Column").columns)
//This doesn't work. How do I do this?
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))
我得到的错误是:
command-33353371:11: error: overloaded method value collect_list with alternatives:
(columnName: String)org.apache.spark.sql.Column <and>
(e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
cannot be applied to (Seq[Array[String]])
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))
我想要做的 df 是:
+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
| Apples| [a, a]| [b, e]|
| Banana| [d]| [f]|
+-----------------------+---------------------------+------------------------------+
在 pyspark 中,我想要做的成功等效是:
import pyspark.pandas as ps
Enormous_df=ps.DataFrame({"Grouping_by_this_Column":["Apples","Apples","Banana"],
"One_of_Thousands_of_Columns":["a","a","d"],
"Second_of_Thousands_of_Columns":["b","e","f"]})\
.to_spark()
The_df_I_Want= Enormous_df.groupby("Grouping_by_this_Column").agg({Column_Title:"collect_list" \
for Column_Title in Enormous_df.drop("Grouping_by_this_Column")\
.columns})\
.withColumnRenamed("collect_list(One_of_Thousands_of_Columns)","One_of_Thousands_of_Columns")\
.withColumnRenamed("collect_list(Second_of_Thousands_of_Columns)","Second_of_Thousands_of_Columns")
The_df_I_Want.show()
如果我没猜错的话,你想要的不是
agg(collect_list(colA, colB, ...))
(不可能)而是agg(collect_list(colA), collect_list(colB), ...)
。
您可以首先从
collect_list
构建 All_Thousands_of_Column_Titles
列表,然后将其传递给 agg
:
val aggregations = All_Thousands_of_Column_Titles.map(col => collect_list(col))
Enormous_Df.groupBy(...).agg(aggregations:_*)
我还没有测试过,但这就是想法。