PySpark 上的 MinHashLSH 问题

问题描述 投票:0回答:0

我正在尝试使用 PySpark 运行文本相似性分析。在使用

CountVectorizer
vocabSize=5000
对我的文本输入进行矢量化后,我正在对数据运行
approxSimilarityJoin
。当我这样做时,我收到与输入向量上的
non-zero
值相关的错误。

我得到的错误如下:

org.apache.spark.SparkException: Failed to execute user defined function(LSHModel$Lambda$3828/1942581113: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.
    at scala.Predef$.require(Predef.scala:281)
    at org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
    at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:101)
    ... 20 more

我尝试通过以下过滤确保我的输入向量没有至少一个零值:

samdf = samdf.filter(F.udf(lambda vec: vec.numNonzeros() > 0, T.BooleanType())("features"))

我还通过运行确保所有输入向量都没有任何

None
值:

samdf = samdf.filter(~F.udf(lambda x: None in x, T.BooleanType())("features"))

我的代码如下:

tokenizer = Tokenizer(inputCol="clean_text", outputCol="tokens")
samdf = tokenizer.transform(samdf)
stopwords = StopWordsRemover(inputCol="tokens", outputCol="tokens_no_stop")
samdf = stopwords.transform(samdf)
samdf = samdf.drop("tokens")
samdf = samdf.withColumnRenamed("tokens_no_stop", "tokens")
samdf = samdf.filter(F.size(F.col("tokens")) > 0)
samdf = samdf.filter(~F.udf(lambda x: "" in x, T.BooleanType())("tokens"))

ctvec = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=5000)
samdf = ctvec.fit(samdf).transform(samdf)

lhs = MinHashLSH(
    inputCol="features",
    outputCol="hashes",
    numHashTables=5
)
model = lhs.fit(samdf)
samdf = model.transform(samdf)

dupedf = (
    model
    .approxSimilarityJoin(samdf, samdf, 0.3, distCol="jsim")
    .filter("datasetA.id_str < datasetB.id_str")
    .select(
        F.col("datasetA.id_str").alias("idA"),
        F.col("datasetB.id_str").alias("idB"),
        F.col("jsim")
    )
)
apache-spark pyspark apache-spark-mllib
© www.soinside.com 2019 - 2024. All rights reserved.