UDF 永远运行并且作业中止

问题描述 投票:0回答:0

我有这样的df:

|1001714437    |[a -> [12 -> 0.9937, 21 -> 0.993, 34 -> 0.9808, 78 -> 0.9311], b -> [123 -> 0.9937, 4532 -> 0.993]] 

df.dtypes:
root
 |-- id: string (nullable = true)
 |-- recs: map (nullable = true)
 |    |-- key: string
 |    |-- value: map (valueContainsNull = true)
 |    |    |-- key: string
 |    |    |-- value: double (valueContainsNull = true)

现在我正在尝试这个

udf
使用一些计算获得新分数,然后从中生成字符串。这是代码:

def new_score(anchor, save_rate, max_sz = 30):
    
    result_str = str
    recs = []
    score = []
    
    if not save_rate:
        result_str
    
    m = len(save_rate)
    ptr = [0] * m
    
    klist = list(save_rate.keys())
    
    while len(recs) < max_sz:
        t = []
        for idx,k in enumerate(klist):
            key = list(save_rate[k].keys())
            if ptr[idx] < len(key):
                robj = {key[ptr[idx]]:save_rate[k][key[ptr[idx]]]}
                robj['rk']=k
                robj['index']=idx
                robj['new_score']=  robj[key[ptr[idx]]] + 0.4 * (len(recs)-ptr[idx])
                t.append(robj)

        if len(t) == 0:
            break

        max_obj = max(t, key=lambda x: x['new_score'])
        if list(max_obj.keys())[0] != anchor:
            recs.append(list(max_obj.keys())[0])
            score.append(max_obj['new_score'])
            ptr[max_obj['index']] = ptr[max_obj['index']] + 1
     
    # Normalizing new score
    max_num = max(score)
    min_num = min(score)
    
    if max_num != min_num:
        score = [np.round(1 - (x-min_num)/(max_num - min_num), 4) for x in score]
    
    result = [[rec, float(sc)] for rec, sc in zip(recs, score)]
    result.sort(key = lambda tup: tup[1], reverse = True)
    
    if len(result) < 6:
        result = list()
   
   # Creating string out of it
    if len(result) > 0:
        result_str = [x[0] + ':' + str(x[1]) for x in result]
        result_str = ','.join(result_str)

    return result_str

new_score = f.udf(new_score)
df = df.withColumn('new', new_score(f.col('id'), f.col('recs')))

现在这对于

200 records
来说运行得非常好但是当我在
4 Million records
上运行整个东西时它需要永远运行然后在大约 8-9 小时后我的 Spark 实例正在关闭。

Spark 配置:

spark_config["spark.executor.memory"] = "8G"
spark_config["spark.executor.memoryOverhead"] = "4G"
spark_config["spark.executor.cores"] = "3"
spark_config["spark.driver.memory"] = "15G"
spark_config["spark.dynamicAllocation.enabled"] = "true"
spark_config["spark.sql.execution.arrow.pyspark.enabled"] = "true"
spark_config["spark.sql.shuffle.partitions"] = "1000"
spark_config["spark.default.parallelism"] = "1000"
spark_config["spark.shuffle.service.enabled"] = "true"
spark_config["spark.dynamicAllocation.minExecutors"] = "50"
spark_config["spark.dynamicAllocation.maxExecutors"] = "600"

有什么我可以优化或增加spark配置,我认为资源足够,因为df没那么大。

python performance apache-spark pyspark user-defined-functions
© www.soinside.com 2019 - 2024. All rights reserved.