令我惊讶的是,我无法找到一种简单的方法来使用外部数组和 pyspark 列中的数组来根据欧几里得距离创建新列。
假设我有以下数据框和向量:
rdd = sc.parallelize([(0,[0.1, 0.2, 0.3]), (1,[0.2, 0.3, 0.1]), (2,[0.1, 0.3, 0.2])])
df = sqlContext.createDataFrame(rdd, ["id", "values"])
vector = [0.3, 0.2, 0.1]
我想简单地计算给定的
vector
与每行中找到的 values
之间的欧几里德距离,然后将其设为新列。我对 pyspark 相当陌生,但一直无法找到一种方法来做到这一点。这就是我的直觉引导我的地方:
# define udf to calculate euclidean distance
def euclidean_distance(values, query_vector):
return sqrt(sum((v - q) ** 2 for v, q in zip(values, query_vector)))
udf_euclidean_distance = F.udf(euclidean_distance)
# pass column through udf
df = df.withColumn('similarity_score',
udf_euclidean_distance(col('values'), lit(vector)))
但是,我最终遇到了错误
feature not supported
。
如果有任何关于如何正确完成此操作的想法,我将不胜感激。谢谢!
避免在此类简单表达式中使用“udf”。 UDF 显着降低了 pySpark 性能。而是使用原生 Spark 函数:
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
sc = SparkContext('local', 'test')
sqlContext = SQLContext(sc)
rdd = sc.parallelize([(0,[0.1, 0.2, 0.3]), (1,[0.2, 0.3, 0.1]), (2,[0.1, 0.3, 0.2])])
df = sqlContext.createDataFrame(rdd, ["id", "values"])
vector = [0.3, 0.2, 0.1]
func_mse = 0
for i in range(len(vector)):
func_mse += f.pow(df['values'][i]-vector[i],2)
func_rmse = f.pow(func_mse, 0.5)
df = df.withColumn('rmse', func_rmse)
print(df.head(3))