如何使用 RDD 函数以更有效的方式执行此 groupBy + agg?

问题描述 投票:0回答:2

假设我创建了一个像这样的新数据框

import random
import string
data = []
for i in range(100_000):
    rid = string.ascii_uppercase[random.randint(0, len(string.ascii_uppercase) - 1)]
    # gen rand val for col1
    a = random.randint(1, 10)
    # gen rand val for col2
    b = random.randint(1, 10)
    # gen rand val for col3
    c = string.ascii_lowercase[random.randint(0, len(string.ascii_lowercase) - 1)]
    data.append((rid, a, b, c))

df = spark.createDataFrame(
    data=data,
    schema=["rid", "col1", "col2", "col3"],
)
df.printSchema()
df.orderBy(sf.col("rid")).show(truncate=False)

我现在运行的查询看起来像这样

(
    df
    .select("*")
    .groupBy("rid")
    .agg(
        sf.collect_list(sf.col("col1")).alias("cnt1"),
        sf.collect_list(sf.col("col2")).alias("cnt2"),
        sf.collect_set(sf.col("col3")).alias("cnt3")
    )
    .select(
        "rid",
        custom_udf_1('cnt1'),
        custom_udf_2('cnt2'),
        custom_udf_3('cnt3')
    )
    ...
)

我怎样才能更有效地做到这一点?主要目标是尝试在每个分区内进行尽可能多的聚合......并且仅在我们真正需要时才在最后进行洗牌。我不想一开始就进行巨大的洗牌...然后开始聚合。

我认为应该可以从

SparkDataFrame
转换为
RDD
,这样我就可以使用
groupByKey
reduceByKey
combineByKey
等函数。然而,我在实际尝试使用这些功能来实现我的目标时遇到了麻烦。

这是我唯一尝试但不起作用的事情。请注意,这不包括使用 UDFs,仅包括 groupBy+聚合部分。

def createCombiner(row):
    return ([row["col1"]], [row["col2"]], {row["col3"]})

def mergeValue(accumulator, row):
    col1_list, col2_list, col3_set = accumulator
    col1_list.append(row["col1"])
    col2_list.append(row["col2"])
    col3_set.add(row["col3"])
    return (col1_list, col2_list, col3_set)

def mergeCombiners(accumulator1, accumulator2):
    col1_list1, col2_list1, col3_set1 = accumulator1
    col1_list2, col2_list2, col3_set2 = accumulator2
    return (col1_list1 + col1_list2, col2_list1 + col2_list2, col3_set1.union(col3_set2))


combined = df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
result = (
    combined
    .map(lambda x: (x[0], x[1][0], x[1][1], list(x[1][2])))
    .toDF(["rid", "collect_list(col1)", "collect_list(col2)", "collect_set(col3)"])
)
result.show()
apache-spark pyspark aggregate user-defined-functions
2个回答
0
投票

这是使用

PandasUDFType.GROUPED_AGG
的解决方案,可以在 groupby 子句中使用。

from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from typing import Iterator, Tuple
import pandas as pd

sc = SparkContext('local')
sqlContext = SQLContext(sc)




data1 = [
    ["01001", 12, 41, 10],
    ["01004", 66, 1, 77],
    ["01003", 31, 52, 10],
    ["01004", 27, 11, 91],
    ["01001", 43, 5, 10 ],
    ["01003", 21, 11, 2 ],
    ["01003", -61, 15, 10],
    ["01001", 67, -11, -22],
    ["01004", 21, -26, -13],
    ["01001", 13, -5, 10 ],
    ["01003", 21, 111, -2 ],
    ["01003", 13, 18, 10],
    ["01001", 49, -17, -22],

      ]

df1Columns = ["row_id", "col1", "col2", "col3"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)


print("Given dataframe")
df1.show(n=100, truncate=False)

schema = StructType([StructField('col1', IntegerType()),
                    StructField('col2', IntegerType()),
                    StructField('col3', IntegerType())])


@pandas_udf(ArrayType(ArrayType(IntegerType())), PandasUDFType.GROUPED_AGG)
def custom_sum_udf(col1_series: pd.Series, col2_series: pd.Series, col3_series: pd.Series) -> ArrayType(ArrayType(IntegerType())):
    concat_df = pd.concat([col1_series, col2_series, col3_series], axis=1)
    print("what is the concat df")
    print(concat_df)
    sum_column = concat_df.sum(axis=0).tolist()
    max_column = concat_df.max(axis=0).tolist()
    min_column = concat_df.min(axis=0).tolist()
    print("sum_column", sum_column)
    print("max_column", max_column)
    print("min_column", min_column)
    all_result = [sum_column,  max_column, min_column]

    return all_result

df_new = df1.groupby(F.col("row_id")).agg(custom_sum_udf(F.col("col1"), F.col("col2"), F.col("col3")).alias("reduced_columns")).cache()
print("Printing the column sum, max, min")
df_new.show(n=100, truncate=False)

df_new_sep = df_new.withColumn("sum_over_columns", F.col("reduced_columns").getItem(0))
df_new_sep = df_new_sep.withColumn("max_over_columns", F.col("reduced_columns").getItem(1))
df_new_sep = df_new_sep.withColumn("min_over_columns", F.col("reduced_columns").getItem(2)).drop(F.col("reduced_columns"))
print("Printing the column sum, max, min")
df_new_sep.show(n=100, truncate=False)

输出:

Given dataframe
+------+----+----+----+
|row_id|col1|col2|col3|
+------+----+----+----+
|01001 |12  |41  |10  |
|01004 |66  |1   |77  |
|01003 |31  |52  |10  |
|01004 |27  |11  |91  |
|01001 |43  |5   |10  |
|01003 |21  |11  |2   |
|01003 |-61 |15  |10  |
|01001 |67  |-11 |-22 |
|01004 |21  |-26 |-13 |
|01001 |13  |-5  |10  |
|01003 |21  |111 |-2  |
|01003 |13  |18  |10  |
|01001 |49  |-17 |-22 |
+------+----+----+----+

+------+-----------------------------------------------+
|row_id|reduced_columns                                |
+------+-----------------------------------------------+
|01001 |[[184, 13, -14], [67, 41, 10], [12, -17, -22]] |
|01003 |[[25, 207, 30], [31, 111, 10], [-61, 11, -2]]  |
|01004 |[[114, -14, 155], [66, 11, 91], [21, -26, -13]]|
+------+-----------------------------------------------+

Printing the column sum, max, min
+------+----------------+----------------+----------------+
|row_id|sum_over_columns|max_over_columns|min_over_columns|
+------+----------------+----------------+----------------+
|01001 |[184, 13, -14]  |[67, 41, 10]    |[12, -17, -22]  |
|01003 |[25, 207, 30]   |[31, 111, 10]   |[-61, 11, -2]   |
|01004 |[114, -14, 155] |[66, 11, 91]    |[21, -26, -13]  |
+------+----------------+----------------+----------------+

0
投票

退一步思考一下您想要调整的内容以及调整的规模可能会很有用。特别是您的用例只有 100k 条记录吗?因为如果是这种情况,spark 就会做一大堆对您的特定用例完全无用的事情。这包括集群取代了进程间和实例间通信等内容。主机之间的随机交换以及与内存相关的技巧(例如溢出到磁盘)。一般来说,在大规模情况下,数据帧比 rdd 快得多,原因有几个。

  1. 数据帧比 RDD 更高级别、更具描述性。这意味着spark可以进行过滤谓词下推、列剪枝、连接类型优化等优化。

  2. 此外,数据帧的这种描述性性质允许 Spark 为您正在执行的每个查询自定义编译一个高效的程序。该程序消除了对原始 scala 对象的需求,并将所有内容转换为 CPU 执行的简单二进制操作。

https://www.databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

如果你真的很聪明并且知道一百万种不同的优化,你也许能够接近数据帧的性能。

数据帧开始落后的情况是在极端情况下。如果数据集很小,不需要交换,则没有必要支付交换所产生的全部排序成本。支付任务管理开销是没有意义的。支付进程间通信开销是没有意义的。

© www.soinside.com 2019 - 2024. All rights reserved.