将 pyspark 列与每一行的数组相乘

问题描述 投票:0回答:1

我有一个包含两列的 pyspark DataFrame。一种是浮点数,另一种是数组。 我知道数组每行的长度与行数的长度相同。 我想在 DataFrame 中创建一个新列,每一行的结果将是数组和列的点积。

例如,对于以下 DataFrame:

+------------------------------------------------------------+-----+
|weights                                                     |value|
+------------------------------------------------------------+-----+
|[0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0]|34   |
|[5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0]|50   |
|[4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0]|56   |
|[3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0]|45   |
|[2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0]|34   |
|[1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0]|36   |
|[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0, 1.0]|45   |
|[1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0, 2.0]|50   |
|[2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0, 3.0]|57   |
|[3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0, 4.0]|39   |
|[4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0, 5.0]|48   |
|[5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 0.0]|39   |
+------------------------------------------------------------+-----+

我想添加一个新列“结果”,每行的值将是:

numpy.dot(row['weights'] * [34, 50, 56, 45, 34, 36, 45, 50, 57, 39, 48, 39]) 

谢谢。

python pyspark
1个回答
0
投票

您需要创建一个 udf(用户定义函数),类似于函数矢量化的完成方式。这是一个例子:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import numpy as np

def dotting(weights, values):
    return np.dot(weights, values)

dotting_udf = udf(dotting, DoubleType())

values = [34, 50, 56, 45, 34, 36, 45, 50, 57, 39, 48, 39]
row = row.withColumn('weights', dotting_udf(df['weights'], values))

参见https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html

© www.soinside.com 2019 - 2024. All rights reserved.