使用UDF和返回另一个阵列Pyspark过程阵列列

问题描述 投票:0回答:1

使用UDF和返回另一个阵列处理阵列列

下面是我输入:

带状疱疹的docID D1 [23,25,39,59] D2 [34,45,65]

我想生成一个新的列被称为带状疱疹的处理阵列列哈希:例如,我想提取最小值和最大值(这是toshow,我希望有一个固定长度的数组列只是例子,我真的不希望找到分钟或最大)

带状疱疹的docID哈希D1 [23,25,39,59] [23,59] D2 [34,45,65] [34,65]

我创建了如下一个UDF:

def generate_minhash_signatures(shingles, coeffA, coeffB):
    signature = []
    minHashCode = nextPrime + 1
    maxHashCode = 0
    for shingleID in shingles:
        if shingleID < minHashCode:
            minHashCode = shingleID
        if shingleID > maxHashCode:
            maxHashCode = shingleID
    return [minHashCode, maxHashCode]

minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()

但它给了以下错误:

TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

实际执行:

def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
    signature = []
    for i in range(0, numHashes):
        minHashCode = nextPrime + 1
        for shingleID in shingles:
            hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

            if hashCode < minHashCode:
                minHashCode = hashCode

        signature.append(minHashCode)
    return signature
apache-spark pyspark
1个回答
1
投票

udf期望所有三个参数是列。很可能coeffAcoeffB不是你需要使用lit转换为列对象只是数值:

import pyspark.sql.functions as f
df.withColumn('min_max_hash', minhash_udf(f.col("shingles"), f.lit(coeffA), f.lit(coeffB)))

如果coeffAcoeffB是列表,使用f.array创建文字如下:

df.withColumn('min_max_hash', 
  minhash_udf(f.col("shingles"), 
  f.array(*map(f.lit, coeffA)),
  f.array(*map(f.lit, coeffB))
)

或单独的列参数和非列参数如下:

def generate_minhash_signatures(coeffA, coeffB, numHashes)
    def generate_minhash_signatures_inner(shingles):
        signature = []
        for i in range(0, numHashes):
            minHashCode = nextPrime + 1
            for shingleID in shingles:
                hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

                if hashCode < minHashCode:
                    minHashCode = hashCode

            signature.append(minHashCode)
        return signature
    return f.udf(generate_minhash_signatures_inner, ArrayType(IntegerType()))

然后你就可以调用该函数为:

df.withColumn('min_max_hash', generate_minhash_signatures(coeffA, coeffB, numHashes)("shingles"))
© www.soinside.com 2019 - 2024. All rights reserved.