有没有一种简单的方法可以将欧氏距离列添加到现有的 PySpark 数据帧?

问题描述 投票:0回答:1

令我惊讶的是,我无法找到一种简单的方法来使用外部数组和 pyspark 列中的数组来根据欧几里得距离创建新列。

假设我有以下数据框和向量:

rdd = sc.parallelize([(0,[0.1, 0.2, 0.3]), (1,[0.2, 0.3, 0.1]), (2,[0.1, 0.3, 0.2])])
df = sqlContext.createDataFrame(rdd, ["id", "values"])

vector = [0.3, 0.2, 0.1]

我想简单地计算给定的

vector
与每行中找到的
values
之间的欧几里德距离,然后将其设为新列。我对 pyspark 相当陌生,但一直无法找到一种方法来做到这一点。这就是我的直觉引导我的地方:


# define udf to calculate euclidean distance
def euclidean_distance(values, query_vector):
    return sqrt(sum((v - q) ** 2 for v, q in zip(values, query_vector)))

udf_euclidean_distance = F.udf(euclidean_distance)

# pass column through udf
df = df.withColumn('similarity_score', 
                                       udf_euclidean_distance(col('values'), lit(vector)))

但是,我最终遇到了错误

feature not supported

如果有任何关于如何正确完成此操作的想法,我将不胜感激。谢谢!

python apache-spark pyspark data-science
1个回答
0
投票

避免在此类简单表达式中使用“udf”。 UDF 显着降低了 pySpark 性能。而是使用原生 Spark 函数:

from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as f


sc = SparkContext('local', 'test')
sqlContext = SQLContext(sc)

rdd = sc.parallelize([(0,[0.1, 0.2, 0.3]), (1,[0.2, 0.3, 0.1]), (2,[0.1, 0.3, 0.2])])
df = sqlContext.createDataFrame(rdd, ["id", "values"])

vector = [0.3, 0.2, 0.1]

func_mse = 0
for i in range(len(vector)):
    func_mse += f.pow(df['values'][i]-vector[i],2)

func_rmse = f.pow(func_mse, 0.5)

df = df.withColumn('rmse', func_rmse)

print(df.head(3))
© www.soinside.com 2019 - 2024. All rights reserved.