我正在处理一个大型数据集,其中包含大小为 (1000,10000) 的向量;我需要从数据集中找出向量
我当前的代码可以在本地 5 分钟内完成 (250,10000) 数据集的任务,但对于更大的数据集 (1000,10000) 会失败。我应该怎么做才能提高性能?
这是我目前所拥有的:
def aggregate_variance(v1, v2, v3) -> float:
lenList = len(v1)
sumList = []
for i in range(0, lenList):
sumList.append(v1[i] + v2[i] + v3[i])
return np.var(sumList)
def q3(spark_context: SparkContext, rdd: RDD):
NumPartition = 8
# NumPartition = 160 # for server (2 workers, each work has 40 cores, so 80 cores in total)
# NumPartition = 240
taus = [20, 410]
tau = spark_context.broadcast(taus)
rdd_dict = rdd.collectAsMap()
broadcast_lst = spark_context.broadcast(rdd_dict)
print(f"first row of rdd:\n {rdd.first()}")
# cartesian join the keys
keys = rdd.keys()
keys2 = keys.cartesian(keys)
keys2 = keys2.filter(lambda x: x[0] < x[1])
keys3 = keys2.cartesian(keys)
keys3 = keys3.filter(lambda x: x[0][1] < x[1] and x[0][0] < x[1])
keyRDD = keys3.repartition(NumPartition)
keyRDD_Cache = keyRDD.cache()
print(f"first row of keyRDD_Cache:\n {keyRDD_Cache.first()}")
resultRDD = keyRDD_Cache.map(lambda x: [x[0][0], x[0][1], x[1]]) \
.filter(lambda x:
aggregate_variance(
broadcast_lst.value[x[0]],
broadcast_lst.value[x[1]],
broadcast_lst.value[x[2]]
) <= tau.value[0])
print(f"resultRDD: {resultRDD.collect()}")
print(f"count: {resultRDD.count()}")
输出将是:
first row of rdd:
('N3DZ', [52, 4, 55, 54, 26, 61, 28, 78, 94, 18])
first row of keyRDD_Cache:
(('N3DZ', 'P7AT'), 'SR03')
resultRDD: [['FO3Y', 'XQL2', 'XWP5'], ['ASGM', 'QWOS', 'VG5W'], ['D9I3', 'LX4X', 'OMFF']]
count: 3
我正在向 aggregate_variance 广播一个看起来效率不高的大字典,我该如何避免这样做?我应该使用缓存还是重新分区以及在哪里使用?提前致谢