将两个不同类型的pyspark数据框列相乘(数组[double] vs double),而不用轻拂

问题描述 投票:0回答:1

我遇到的问题与问here相同,但是我需要在pyspark中解决问题并且没有微风。

例如,如果我的pyspark数据帧看起来像这样:

user    |  weight  |  vec
"u1"    | 0.1      | [2, 4, 6]
"u1"    | 0.5      | [4, 8, 12]
"u2"    | 0.5      | [20, 40, 60]

其中列权重类型为double,列vec类型为Array [Double],我想获取每个用户向量的加权总和,以便获得如下所示的数据帧:

user    |  wsum
"u1"    | [2.2, 4.4, 6.6]
"u2"    | [10, 20, 30]

为此,我尝试了以下操作:

df.groupBy('user').agg((F.sum(df.vec* df.weight)).alias("wsum"))

但是由于vec列和weight列具有不同的类型,因此失败。

我如何不费吹灰之力地解决此错误?

python pyspark pyspark-sql pyspark-dataframes
1个回答
0
投票

正在使用Spark 2.4提供的使用高阶函数transform的途中:

# get size of vec array
n = df.select(size("vec")).first()[0]

# transform each element of the vec array
transform_expr = "transform(vec, x -> x * weight)"

df.withColumn("weighted_vec", expr(transform_expr)) \
  .groupBy("user").agg(array(*[sum(col("weighted_vec")[i]) for i in range(n)]).alias("wsum"))\
  .show()

给予:

+----+------------------+
|user|              wsum|
+----+------------------+
|  u1|   [2.2, 4.4, 6.6]|
|  u2|[10.0, 20.0, 30.0]|
+----+------------------+

对于Spark <2.4,使用for理解将每个元素乘以weight列,如下所示:

df.withColumn("weighted_vec", array(*[col("vec")[i] * col("weight") for i in range(n)])) \
  .groupBy("user").agg(array(*[sum(col("weighted_vec")[i]) for i in range(n)]).alias("wsum")) \
  .show()
© www.soinside.com 2019 - 2024. All rights reserved.