我有这样的df:
|1001714437 |[a -> [12 -> 0.9937, 21 -> 0.993, 34 -> 0.9808, 78 -> 0.9311], b -> [123 -> 0.9937, 4532 -> 0.993]]
df.dtypes:
root
|-- id: string (nullable = true)
|-- recs: map (nullable = true)
| |-- key: string
| |-- value: map (valueContainsNull = true)
| | |-- key: string
| | |-- value: double (valueContainsNull = true)
现在我正在尝试这个
udf
使用一些计算获得新分数,然后从中生成字符串。这是代码:
def new_score(anchor, save_rate, max_sz = 30):
result_str = str
recs = []
score = []
if not save_rate:
result_str
m = len(save_rate)
ptr = [0] * m
klist = list(save_rate.keys())
while len(recs) < max_sz:
t = []
for idx,k in enumerate(klist):
key = list(save_rate[k].keys())
if ptr[idx] < len(key):
robj = {key[ptr[idx]]:save_rate[k][key[ptr[idx]]]}
robj['rk']=k
robj['index']=idx
robj['new_score']= robj[key[ptr[idx]]] + 0.4 * (len(recs)-ptr[idx])
t.append(robj)
if len(t) == 0:
break
max_obj = max(t, key=lambda x: x['new_score'])
if list(max_obj.keys())[0] != anchor:
recs.append(list(max_obj.keys())[0])
score.append(max_obj['new_score'])
ptr[max_obj['index']] = ptr[max_obj['index']] + 1
# Normalizing new score
max_num = max(score)
min_num = min(score)
if max_num != min_num:
score = [np.round(1 - (x-min_num)/(max_num - min_num), 4) for x in score]
result = [[rec, float(sc)] for rec, sc in zip(recs, score)]
result.sort(key = lambda tup: tup[1], reverse = True)
if len(result) < 6:
result = list()
# Creating string out of it
if len(result) > 0:
result_str = [x[0] + ':' + str(x[1]) for x in result]
result_str = ','.join(result_str)
return result_str
new_score = f.udf(new_score)
df = df.withColumn('new', new_score(f.col('id'), f.col('recs')))
现在这对于
200 records
来说运行得非常好但是当我在 4 Million records
上运行整个东西时它需要永远运行然后在大约 8-9 小时后我的 Spark 实例正在关闭。
Spark 配置:
spark_config["spark.executor.memory"] = "8G"
spark_config["spark.executor.memoryOverhead"] = "4G"
spark_config["spark.executor.cores"] = "3"
spark_config["spark.driver.memory"] = "15G"
spark_config["spark.dynamicAllocation.enabled"] = "true"
spark_config["spark.sql.execution.arrow.pyspark.enabled"] = "true"
spark_config["spark.sql.shuffle.partitions"] = "1000"
spark_config["spark.default.parallelism"] = "1000"
spark_config["spark.shuffle.service.enabled"] = "true"
spark_config["spark.dynamicAllocation.minExecutors"] = "50"
spark_config["spark.dynamicAllocation.maxExecutors"] = "600"
有什么我可以优化或增加spark配置,我认为资源足够,因为df没那么大。