我正在尝试获取 Spark 数据框中不同列中两个向量之间的欧几里德距离。我需要在 Spark 上完成此操作。我花了很多时间尝试做到这一点,但无法弄清楚。
下面是与我的情况类似的示例代码。
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql.functions import expr
from pyspark.sql import SparkSession, types as T, functions as F
# Initialize Spark session
spark = SparkSession.builder \
.appName("euclidian distance") \
.getOrCreate()
# Sample data
data = [(Vectors.dense([1.0, 2.0]), Vectors.dense([3.0, 4.0])),
(Vectors.dense([5.0, 6.0]), Vectors.dense([7.0, 8.0])),
(Vectors.dense([9.0, 10.0]), Vectors.dense([11.0, 12.0]))]
# Create DataFrame
df = spark.createDataFrame(data, ["vector1", "vector2"])
# Define UDF for vector subtraction
def ed_vectors_udf(v1, v2):
return Vectors.dense(v1).squared_distance(Vectors.dense(v2))
# return Vectors.dense([x - y for x, y in zip(v1, v2)])
# Register UDF
# ed_vectors_udf = fn.udf(lambda v1, v2: eq_vectors(v1, v2), T.DoubleType())
spark.udf.register("ed_vectors_udf", ed_vectors_udf, T.DoubleType())
# Subtract vectors using UDF
df = df.withColumn("distance", ed_vectors_udf(fn.col('vector1'), fn.col('vector2')))
# Show DataFrame with subtraction result
df.show(5)
看看这个:
import pyspark.sql.functions as f
df = (
spark.createDataFrame([
([1,2], [3, 4]),
([5,6], [7, 8]),
([9,10], [11, 12])
], ['vector1', 'vector2'])
.withColumn('distance', f.expr('pow(aggregate(transform(vector1, (x, i) -> pow(vector2[i] - x, 2)), cast(0 as double), (acc, x) -> acc + x), 0.5)'))
)
输出为:
+-------+--------+------------------+
|vector1| vector2| distance|
+-------+--------+------------------+
| [1, 2]| [3, 4]|2.8284271247461903|
| [5, 6]| [7, 8]|2.8284271247461903|
|[9, 10]|[11, 12]|2.8284271247461903|
+-------+--------+------------------+