如何在PySpark中获取数组类型列的L2范数?

问题描述 投票:0回答:2

我有一个 PySpark 数据框。

df1 = spark.createDataFrame([
    ("u1", [0, 1, 2]),
    ("u1", [1, 2, 3]),
    ("u2", [2, 3, 4]),

    ],
    ['user_id', 'features'])

print(df1.printSchema())
df1.show(truncate=False)

输出-

root
 |-- user_id: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: long (containsNull = true)

None
+-------+---------+
|user_id|features |
+-------+---------+
|u1     |[0, 1, 2]|
|u1     |[1, 2, 3]|
|u2     |[2, 3, 4]|
+-------+---------+

我想得到特征的L2范数,所以我写了一个UDF-

def norm_2_func(features):
    return features/np.linalg.norm(features, 2)

norm_2_udf = udf(norm_2_func, ArrayType(FloatType()))
df2 = df1.withColumn('l2_features', norm_2_udf(F.col('features')))

但是它抛出了一些错误。我怎样才能实现这个目标?

预期输出是-

+-------+---------+----------------------+
|user_id|features |               L2_norm|
+-------+---------+----------------------+
|u1     |[0, 1, 2]| [0.000, 0.447, 0.894]|
|u1     |[1, 2, 3]| [0.267, 0.534, 0.801]|
|u2     |[2, 3, 4]| [0.371, 0.557, 0.742]|
+-------+---------+----------------------+
dataframe apache-spark pyspark apache-spark-sql
2个回答
2
投票

Numpy 数组包含 numpy 数据类型,在返回之前需要将其转换为普通 Python 数据类型(float/int 等):

import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, FloatType

def norm_2_func(features):
    return [float(i) for i in features/np.linalg.norm(features, 2)]
    # you can also use
    # return list(map(float, features/np.linalg.norm(features, 2)))

norm_2_udf = F.udf(norm_2_func, ArrayType(FloatType()))
df2 = df1.withColumn('l2_features', norm_2_udf(F.col('features')))

df2.show(truncate=False)
+-------+---------+-----------------------------------+
|user_id|features |l2_features                        |
+-------+---------+-----------------------------------+
|u1     |[0, 1, 2]|[0.0, 0.4472136, 0.8944272]        |
|u1     |[1, 2, 3]|[0.26726124, 0.5345225, 0.80178374]|
|u2     |[2, 3, 4]|[0.37139067, 0.557086, 0.74278134] |
+-------+---------+-----------------------------------+

0
投票

这是另一种完成此操作的方法,将所有列放入

Vector

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from operator import add
spark = SparkSession.builder.getOrCreate()

# Samples Data
df = spark.createDataFrame([(4,3,),(5,7,)], schema="x int, y int")


def norm(cols):
   return (
      F.sqrt(
         F.reduce(
            F.transform(F.array(*[F.col(c) for c in cols]), lambda x: x**2), 
            F.lit(0.0), 
            add
          )
      )
   )

# Transform
df.select(norm(["x","y"]).alias("norm")).show()

© www.soinside.com 2019 - 2024. All rights reserved.