如何提升Spark性能

问题描述 投票:0回答:0

我正在处理一个大型数据集,其中包含大小为 (1000,10000) 的向量;我需要从数据集中找出向量 的所有三元组,总方差最多为 τ

我当前的代码可以在本地 5 分钟内完成 (250,10000) 数据集的任务,但对于更大的数据集 (1000,10000) 会失败。我应该怎么做才能提高性能?

这是我目前所拥有的:

def aggregate_variance(v1, v2, v3) -> float:
    lenList = len(v1)
    sumList = []
    for i in range(0, lenList):
        sumList.append(v1[i] + v2[i] + v3[i])
    return np.var(sumList)

def q3(spark_context: SparkContext, rdd: RDD):

    NumPartition = 8
#    NumPartition = 160     # for server (2 workers, each work has 40 cores, so 80 cores in total)
#    NumPartition = 240

    taus = [20, 410]
    tau = spark_context.broadcast(taus)
    rdd_dict = rdd.collectAsMap()
    broadcast_lst = spark_context.broadcast(rdd_dict)
    print(f"first row of rdd:\n {rdd.first()}")

    # cartesian join the keys 
    keys = rdd.keys()
    keys2 = keys.cartesian(keys)
    keys2 = keys2.filter(lambda x: x[0] < x[1])
    keys3 = keys2.cartesian(keys)
    keys3 = keys3.filter(lambda x: x[0][1] < x[1] and x[0][0] < x[1])

    keyRDD = keys3.repartition(NumPartition)
    keyRDD_Cache = keyRDD.cache()
    print(f"first row of keyRDD_Cache:\n {keyRDD_Cache.first()}")

    resultRDD = keyRDD_Cache.map(lambda x: [x[0][0], x[0][1], x[1]]) \
                            .filter(lambda x: 
                                aggregate_variance(
                                    broadcast_lst.value[x[0]], 
                                    broadcast_lst.value[x[1]], 
                                    broadcast_lst.value[x[2]]
                                ) <= tau.value[0])
    

    print(f"resultRDD: {resultRDD.collect()}")
    print(f"count: {resultRDD.count()}")

输出将是:

first row of rdd:                                                               
 ('N3DZ', [52, 4, 55, 54, 26, 61, 28, 78, 94, 18])
first row of keyRDD_Cache:                                                      
 (('N3DZ', 'P7AT'), 'SR03')
resultRDD: [['FO3Y', 'XQL2', 'XWP5'], ['ASGM', 'QWOS', 'VG5W'], ['D9I3', 'LX4X', 'OMFF']]
count: 3 

我正在向 aggregate_variance 广播一个看起来效率不高的大字典,我该如何避免这样做?我应该使用缓存还是重新分区以及在哪里使用?提前致谢

python apache-spark pyspark bigdata rdd
© www.soinside.com 2019 - 2024. All rights reserved.