我是PySpark的新手。我正在尝试使用python和pyspark.mllib.recommendation pakage实现ALS(交替最小二乘矩阵分解)以用于推荐目的。根据PySpark documentation,我发现我应该使用ranking metrics对隐式反馈进行系统评估。但是不幸的是这些文档并没有在python部分更新,当我尝试自己实现它时,我在RDD类型上遇到了一些不同的问题。请帮助我找到错误。我不确定是否应该在createDataFrame上使用。rdd还是应该使用另一个函数来创建rdd类型...
def build_model_Als(self):
data = self.load_from_redis()
self.dataframe = Pandas.DataFrame({"user": data[0:, 0], "item": data[0:, 1], "rate": data[0:, 2]})
train = self.dataframe.sample(frac=0.8 , random_state=99)
test = self.dataframe.loc[~self.dataframe.index.isin(train.index), :]
ts = test.drop(columns=['rate'])
ps = test.drop(columns=['user'])
ratings = spark.createDataFrame(self.dataframe).rdd
testdata = spark.createDataFrame(ts).rdd
self.model = ALS.train(ratings, rank=10, iterations=10, lambda_=0.01, nonnegative=True)
predictions = self.model.predictAll(testdata)
ratesAndPreds = ratings.join(predictions)
metrics = RankingMetrics(ratesAndPreds)
print("Mean average precision =", metrics.meanAveragePrecision )
这是错误:
py4j.protocol.Py4JJavaError:调用o207.meanAveragePrecision时发生错误。:org.apache.spark.SparkException:由于阶段失败而导致作业中止:213.0阶段中的任务0失败1次,最近一次失败:213.0阶段中的任务0.0丢失(TID 273,本地主机,执行程序驱动程序):java.lang.ClassCastException :java.lang.Long无法强制转换为scala.collection.Seq在org.apache.spark.sql.Row $ class.getSeq(Row.scala:283)在org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166)在org.apache.spark.mllib.api.python.PythonMLLibAPI $$ anonfun $ newRankingMetrics $ 1.apply(PythonMLLibAPI.scala:1070)在org.apache.spark.mllib.api.python.PythonMLLibAPI $$ anonfun $ newRankingMetrics $ 1.apply(PythonMLLibAPI.scala:1070)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:410)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:410)在scala.collection.Iterator $ class.foreach(Iterator.scala:891)在scala.collection.AbstractIterator.foreach(Iterator.scala:1334)在org.apache.spark.util.StatCounter.merge(StatCounter.scala:55)在org.apache.spark.util.StatCounter。(StatCounter.scala:37)在org.apache.spark.util.StatCounter $ .apply(StatCounter.scala:158)at org.apache.spark.rdd.DoubleRDDFunctions $$ anonfun $ stats $ 1 $$ anonfun $ apply $ 1.apply(DoubleRDDFunctions.scala:43)at org.apache.spark.rdd.DoubleRDDFunctions $$ anonfun $ stats $ 1 $$ anonfun $ apply $ 1.apply(DoubleRDDFunctions.scala:43)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $ anonfun $ apply $ 23.apply(RDD.scala:801)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $ anonfun $ apply $ 23.apply(RDD.scala:801)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)在org.apache.spark.rdd.RDD.iterator(RDD.scala:288)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)在org.apache.spark.scheduler.Task.run(Task.scala:123)在org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply(Executor.scala:408)在org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1360)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:414)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)在java.lang.Thread.run(Thread.java:748)
我已经尝试过基于RY和PYSpark的Dataframe的不同方法来构建,拟合和评估模型。最终,我发现“ join”函数无助于实现ratesAndPreds,因为它应使用spark.SparkContext.parallelize函数以RDD结构构建。我将功能更改如下:
def build_model_Als(self):
data = self.load_from_redis()
self.dataframe = pd.DataFrame({"user": data[0:, 0], "item": data[0:, 1], "rate": data[0:, 2]})
train = self.dataframe.sample(frac=0.8 , random_state=99)
test = self.dataframe.loc[~self.dataframe.index.isin(train.index), :]
ts = test.drop(columns=['rate'])
logging.info("start train")
als = ALS(rank=10, maxIter=10, regParam=0.01, userCol="user", itemCol="item", implicitPrefs=True,
ratingCol="rate", coldStartStrategy="drop", nonnegative=True)
self.model = als.fit(spark.createDataFrame(train))
logging.info("done")
global_user = None
user_list = np.array([], dtype=np.float64)
testItems = list()
for row in test.iterrows():
if row[1]['user'] != global_user:
user_list = np.append(user_list, row[1]['user'])
testItems.append(int(row[1]['item']))
global_user = row[1]['user']
else:
testItems.append(int(row[1]['item']))
true_list[global_user] = testItems
pandasDf = pd.DataFrame({'user': user_list})
sub_user = spark.createDataFrame(pandasDf)
labelsList = list()
for user, items in self.model.recommendForUserSubset(sub_user, 30).collect():
predict_items = [i.item for i in items]
labelsList.append((predict_items, true_list[user]))
labels = spark.sparkContext.parallelize(labelsList)
metrics = RankingMetrics(labels)
print(metrics.meanAveragePrecision)
该点是spark.sparkContext.parallelize返回右RDD来找到RankingMetrics.meanAveragePrecision处的交集,但join不能。例如:spark.sparkContext.parallelize([([1,2,3,4],[1,2,12,13,1]),])对meanAveragePrecision进行适当的RDD,结果是:40%