如何实现Pyspark的排名指标?

问题描述 投票:0回答:1

我是PySpark的新手。我正在尝试使用pythonpyspark.mllib.recommendation pakage实现ALS(交替最小二乘矩阵分解)以用于推荐目的。根据PySpark documentation,我发现我应该使用ranking metrics对隐式反馈进行系统评估。但是不幸的是这些文档并没有在python部分更新,当我尝试自己实现它时,我在RDD类型上遇到了一些不同的问题。请帮助我找到错误。我不确定是否应该在createDataFrame上使用。rdd还是应该使用另一个函数来创建rdd类型...

def build_model_Als(self):
   data = self.load_from_redis()
   self.dataframe = Pandas.DataFrame({"user": data[0:, 0], "item": data[0:, 1], "rate": data[0:, 2]})

    train = self.dataframe.sample(frac=0.8 , random_state=99)
    test = self.dataframe.loc[~self.dataframe.index.isin(train.index), :]

    ts = test.drop(columns=['rate'])
    ps = test.drop(columns=['user'])

    ratings = spark.createDataFrame(self.dataframe).rdd
    testdata = spark.createDataFrame(ts).rdd

    self.model = ALS.train(ratings, rank=10, iterations=10, lambda_=0.01, nonnegative=True)
    predictions = self.model.predictAll(testdata)
    ratesAndPreds = ratings.join(predictions)
    metrics = RankingMetrics(ratesAndPreds)
    print("Mean average precision =", metrics.meanAveragePrecision )

这是错误:

py4j.protocol.Py4JJavaError:调用o207.meanAveragePrecision时发生错误。:org.apache.spark.SparkException:由于阶段失败而导致作业中止:213.0阶段中的任务0失败1次,最近一次失败:213.0阶段中的任务0.0丢失(TID 273,本地主机,执行程序驱动程序):java.lang.ClassCastException :java.lang.Long无法强制转换为scala.collection.Seq在org.apache.spark.sql.Row $ class.getSeq(Row.scala:283)在org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166)在org.apache.spark.mllib.api.python.PythonMLLibAPI $$ anonfun $ newRankingMetrics $ 1.apply(PythonMLLibAPI.scala:1070)在org.apache.spark.mllib.api.python.PythonMLLibAPI $$ anonfun $ newRankingMetrics $ 1.apply(PythonMLLibAPI.scala:1070)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:410)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:410)在scala.collection.Iterator $ class.foreach(Iterator.scala:891)在scala.collection.AbstractIterator.foreach(Iterator.scala:1334)在org.apache.spark.util.StatCounter.merge(StatCounter.scala:55)在org.apache.spark.util.StatCounter。(StatCounter.scala:37)在org.apache.spark.util.StatCounter $ .apply(StatCounter.scala:158)at org.apache.spark.rdd.DoubleRDDFunctions $$ anonfun $ stats $ 1 $$ anonfun $ apply $ 1.apply(DoubleRDDFunctions.scala:43)at org.apache.spark.rdd.DoubleRDDFunctions $$ anonfun $ stats $ 1 $$ anonfun $ apply $ 1.apply(DoubleRDDFunctions.scala:43)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $ anonfun $ apply $ 23.apply(RDD.scala:801)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $ anonfun $ apply $ 23.apply(RDD.scala:801)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)在org.apache.spark.rdd.RDD.iterator(RDD.scala:288)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)在org.apache.spark.scheduler.Task.run(Task.scala:123)在org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply(Executor.scala:408)在org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1360)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:414)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)在java.lang.Thread.run(Thread.java:748)

python pyspark collaborative-filtering recommender-systems
1个回答
0
投票

我已经尝试过基于RY和PYSpark的Dataframe的不同方法来构建,拟合和评估模型。最终,我发现“ join”函数无助于实现ratesAndPreds,因为它应使用spark.SparkContext.parallelize函数以RDD结构构建。我将功能更改如下:

 def build_model_Als(self):
    data = self.load_from_redis()
    self.dataframe = pd.DataFrame({"user": data[0:, 0], "item": data[0:, 1], "rate": data[0:, 2]})
    train = self.dataframe.sample(frac=0.8 , random_state=99)
    test = self.dataframe.loc[~self.dataframe.index.isin(train.index), :]
    ts = test.drop(columns=['rate'])
    logging.info("start train")

    als = ALS(rank=10, maxIter=10, regParam=0.01, userCol="user", itemCol="item", implicitPrefs=True,
              ratingCol="rate", coldStartStrategy="drop", nonnegative=True)
    self.model = als.fit(spark.createDataFrame(train))
    logging.info("done")

    global_user = None
    user_list = np.array([], dtype=np.float64)
    testItems = list()
    for row in test.iterrows():

        if row[1]['user'] != global_user:
            user_list = np.append(user_list, row[1]['user'])
            testItems.append(int(row[1]['item']))
            global_user = row[1]['user']
        else:
            testItems.append(int(row[1]['item']))
        true_list[global_user] = testItems

    pandasDf = pd.DataFrame({'user': user_list})
    sub_user = spark.createDataFrame(pandasDf)
    labelsList = list()
    for user, items in self.model.recommendForUserSubset(sub_user, 30).collect():

        predict_items = [i.item for i in items]
        labelsList.append((predict_items, true_list[user]))
    labels = spark.sparkContext.parallelize(labelsList)
    metrics = RankingMetrics(labels)
    print(metrics.meanAveragePrecision)

该点是spark.sparkContext.parallelize返回右RDD来找到RankingMetrics.meanAveragePrecision处的交集,但join不能。例如:spark.sparkContext.parallelize([([1,2,3,4],[1,2,12,13,1]),])对meanAveragePrecision进行适当的RDD,结果是:40%

© www.soinside.com 2019 - 2024. All rights reserved.