如何优化 pyspark 中的链式多重连接

问题描述 投票:0回答:1

几乎要花几天时间才能跑,甚至连跑都没有

df = (
        a
        .join(b, on='aa', how='left')
        .join(c, on='aa', how='left')
        .join(d, on='aa', how='left')
        .join(e, on='bb', how='left')
        .join(f, on='cc', how='left')
        .join(g, on='dd', how='left')
    )    

数据框的行数

a: 3500000
b: 85000000
c: 420000
d: 2200000
e: 30000
f: 20000
g: 60000

有向无环图 enter image description here

apache-spark join pyspark databricks
1个回答
0
投票

每个Spark数据帧内部都有RDD,我们可以用它来优化查询。 您可以根据上面给出的示例尝试使用 RDD 连接表。

将每个表转换为要连接的列上的 Pair RDD

pair_a = a.rdd.map(lambda x: (x[0], x)) # pair("aa", Row(Record))
pair_b = b.rdd.map(lambda x: (x[1], x)) # pair("aa", Row(Record))
pair_c = c.rdd.map(lambda x: (x[1], x)) # pair("aa", Row(Record))
pair_d = d.rdd.map(lambda x: (x[1], x)) # pair("aa", Row(Record))

加入专栏

join_aa_pair = pair_a.leftOuterJoin(pair_b).leftOuterJoin(pair_c).leftOuterJoin(pair_d)
# ('aa104', (((Row(aa='aa104', name='name:104'), bb123), cc11), dd212)) =>  ('aa104', 'name:104', 'bb123', 'cc11', 'dd212')
transform_aa_pair = join_aa_pair.map(lambda x : (x[1][0][0][0].aa, x[1][0][0][0].name, checkNoneBB(x[1][0][0][1]), checkNoneCC(x[1][0][1]), checkNoneDD(x[1][1])))

def checkNoneBB(x):
    if x is not None:
       return ""+x.bb
    else:
       return "bbNone"

将另一个表转换为pairRDD

pair_e = e.rdd.map(lambda x: (x[1], x)) # pair("bb", Row(Record))
pair_f = f.rdd.map(lambda x: (x[1], x)) # pair("cc", Row(Record))
pair_g = g.rdd.map(lambda x: (x[1], x)) # pair("dd", Row(Record))

将 RDD 转换为 bb 列的 Pair RDD

bb_transform_pair = transform_aa_pair.map(lambda x: (x[2], x)) # pair("bb", Row(Record))
bb_join_pair = bb_transform_pair.leftOuterJoin(pair_e) # left join on bb

将 RDD 转换为 cc 列的 Pair RDD

cc_transform_pair = bb_join_pair.map(lambda x: (x[1][0][3], x[1])) # create pair("cc", Row(Record))
cc_join_pair = cc_transform_pair.leftOuterJoin(pair_f) # left join on cc

将 RDD 转换为 dd 列的 Pair RDD

dd_transform_pair = cc_join_pair.map(lambda x: (x[1][0][0][4], x[1][0][0])) # create pair("dd", Row(Record))
dd_join_pair = dd_transform_pair.leftOuterJoin(pair_g) # left join on dd

dd_join_pair 将包含所有列的连接属性。转换为您的列的正确行记录,提供一个架构以转换回 Spark sql 数据帧。

spark.createDataFrame(dd_join_pair, schema=schema)

这是一种使用RDD进行Join的优化方式。

建议:避免使用所有数据进行分析,尝试过滤掉不必要的数据并减少数据大小。

© www.soinside.com 2019 - 2024. All rights reserved.