我有一个包含两列的 pyspark DataFrame。一种是浮点数,另一种是数组。 我知道数组每行的长度与行数的长度相同。 我想在 DataFrame 中创建一个新列,每一行的结果将是数组和列的点积。
例如,对于以下 DataFrame:
+------------------------------------------------------------+-----+
|weights |value|
+------------------------------------------------------------+-----+
|[0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0]|34 |
|[5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0]|50 |
|[4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0]|56 |
|[3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0]|45 |
|[2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0]|34 |
|[1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0]|36 |
|[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0]|45 |
|[1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0]|50 |
|[2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0]|57 |
|[3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0]|39 |
|[4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0]|48 |
|[5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0]|39 |
+------------------------------------------------------------+-----+
我想添加一个新列“结果”,每行的值将是:
numpy.dot(row['weights'] * [34, 50, 56, 45, 34, 36, 45, 50, 57, 39, 48, 39])
谢谢。
您需要创建一个 udf(用户定义函数),类似于函数矢量化的完成方式。这是一个例子:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import numpy as np
def dotting(weights, values):
return np.dot(weights, values)
dotting_udf = udf(dotting, DoubleType())
values = [34, 50, 56, 45, 34, 36, 45, 50, 57, 39, 48, 39]
row = row.withColumn('weights', dotting_udf(df['weights'], values))
参见https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html