Spark UnsupportedOperationException:空集合

问题描述 投票:0回答:4

在尝试使用 Databricks 提供的动手实验室执行 Spark mllib ALS 时,有谁知道此错误的可能原因吗?

14/11/20 23:33:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/11/20 23:33:39 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Got 27980 ratings from 24071 users on 4211 movies.
Training: 27989, validation: 0, test: 0
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:806)
    at MovieLensALS$.computeRmse(MovieLensALS.scala:149)
    at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply$mcVI$sp(MovieLensALS.scala:95)
    at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
    at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply$mcVD$sp(MovieLensALS.scala:93)
    at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
    at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at MovieLensALS$$anonfun$main$1.apply$mcVI$sp(MovieLensALS.scala:93)
    at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
    at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at MovieLensALS$.main(MovieLensALS.scala:93)
    at MovieLensALS.main(MovieLensALS.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

更新:当然可以!我正在使用这个类。它可在 https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.htmlhttps://databricks-training.s3.amazonaws.com/getting-started.html 中找到#额外需要下载。如果还有什么可以帮助的请告诉我

import java.io.File

import scala.io.Source

import org.apache.log4j.Logger
import org.apache.log4j.Level

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._`enter code here`
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

object MovieLensALS {

  def main(args: Array[String]) {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    if (args.length != 2) {
      println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class MovieLensALS " +
        "target/scala-*/movielens-als-ssembly-*.jar movieLensHomeDir personalRatingsFile")
      sys.exit(1)
    }

    // set up environment

    val conf = new SparkConf()
      .setAppName("MovieLensALS")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    // load personal ratings

    val myRatings = loadRatings(args(1))
    val myRatingsRDD = sc.parallelize(myRatings, 1)

    // load ratings and movie titles

    val movieLensHomeDir = args(0)

    val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =>
      val fields = line.split("::")
      // format: (timestamp % 10, Rating(userId, movieId, rating))
      (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
    }

    val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =>
      val fields = line.split("::")
      // format: (movieId, movieName)
      (fields(0).toInt, fields(1))
    }.collect().toMap

    val numRatings = ratings.count()
    val numUsers = ratings.map(_._2.user).distinct().count()
    val numMovies = ratings.map(_._2.product).distinct().count()

    println("Got " + numRatings + " ratings from "
      + numUsers + " users on " + numMovies + " movies.")

    // split ratings into train (60%), validation (20%), and test (20%) based on the 
    // last digit of the timestamp, add myRatings to train, and cache them

    val numPartitions = 4
    val training = ratings.filter(x => x._1 < 6)
      .values
      .union(myRatingsRDD)
      .repartition(numPartitions)
      .cache()
    val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
      .values
      .repartition(numPartitions)
      .cache()
    val test = ratings.filter(x => x._1 >= 8).values.cache()

    val numTraining = training.count()
    val numValidation = validation.count()
    val numTest = test.count()

    println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)

    // train models and evaluate them on the validation set

    val ranks = List(8, 12)
    val lambdas = List(0.1, 10.0)
    val numIters = List(10, 20)
    var bestModel: Option[MatrixFactorizationModel] = None
    var bestValidationRmse = Double.MaxValue
    var bestRank = 0
    var bestLambda = -1.0
    var bestNumIter = -1
    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
      val model = ALS.train(training, rank, numIter, lambda)
      val validationRmse = computeRmse(model, validation, numValidation)
      println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " 
        + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
      if (validationRmse < bestValidationRmse) {
        bestModel = Some(model)
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lambda
        bestNumIter = numIter
      }
    }

    // evaluate the best model on the test set

    val testRmse = computeRmse(bestModel.get, test, numTest)

    println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
      + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")

    // create a naive baseline and compare it with the best model

    val meanRating = training.union(validation).map(_.rating).mean
    val baselineRmse = 
      math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
    val improvement = (baselineRmse - testRmse) / baselineRmse * 100
    println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")

    // make personalized recommendations

    val myRatedMovieIds = myRatings.map(_.product).toSet
    val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
    val recommendations = bestModel.get
      .predict(candidates.map((0, _)))
      .collect()
      .sortBy(- _.rating)
      .take(50)

    var i = 1
    println("Movies recommended for you:")
    recommendations.foreach { r =>
      println("%2d".format(i) + ": " + movies(r.product))
      i += 1
    }

    // clean up
    sc.stop()
  }

  /** Compute RMSE (Root Mean Squared Error). */
  def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
    val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
    val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
      .join(data.map(x => ((x.user, x.product), x.rating)))
      .values
    math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
  }

  /** Load ratings from file. */
  def loadRatings(path: String): Seq[Rating] = {
    val lines = Source.fromFile(path).getLines()
    val ratings = lines.map { line =>
      val fields = line.split("::")
      Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
    }.filter(_.rating > 0.0)
    if (ratings.isEmpty) {
      sys.error("No ratings provided.")
    } else {
      ratings.toSeq
    }
  }
}
java scala apache-spark
4个回答
4
投票

可能是因为您(或computeRmse方法)正在使用某些过滤器,所以在空集合/RDD上调用reduce方法,因此抛出“空集合”。 尝试仔细检查过滤器或computeRmse()函数。


1
投票

我在使用相同的示例时遇到了相同的问题。问题是我使用的训练数据不够大并且没有足够的重复值。 ALS 模型只能预测训练数据中存在的用户、产品 ID 对。 (这与其他机器学习算法有所不同),因此如果验证集中的每一对都包含一个不在训练集中的 ID,则预测 RDD 将为空(因为它无法预测任何这些值)并且 rmse 方法中的归约转换将抛出此异常。为了避免这种情况,您应该:

A) 在没有足够的训练数据的情况下不要使用该算法,并且

B) 在进入“寻找最佳模型”循环之前检查您的验证集是否适用于训练该训练集。

最后,如果您要产品化此算法,请确保不要使用此方法返回的最佳模型,因为它可能不包含您的所有用户和产品 ID。如果是这样,那么您就限制了您可以预测的新用户和产品。我建议使用这种逻辑来辨别正确的训练参数,然后使用这些参数在所有数据上训练模型并使用它。


1
投票

我遇到了完全相同的异常。就我而言,这是代码中的一个错误,导致实际评级 RDD 的大小为零:) 通过将空评级 RDD 传递给 ALS.train,我绝对应该得到 UnsupportedOperationException:空集合


0
投票

补充@asu的答案。您可以使用

.reduceOption
而不是
.reduce
来防止调用空集合时发生错误。然后您只需处理该选项即可。

© www.soinside.com 2019 - 2024. All rights reserved.