我正在尝试通过使用 map 和 reduce 而不是 for 循环来并行化 Spark 中 AUC 的计算。
但是,因为我有另一个 RDD
sc.parallelize(recModelPredictionsAndLabels)
并且我不能在转换中使用 RDD,因为不允许嵌套 RDD 转换。
BinaryClassificationMetrics()
仅支持 RDD 作为输入。
这是我当前的代码:
def computeRecommenderAUC(
trainData: RDD[Rating],
testData: RDD[Rating],
bUserArtistMap: Broadcast[scala.collection.Map[Int, Set[Int]]],
rank: Int,
numIterations: Int,
lambda: Double,
alpha: Double
): Double = {
// Initialize the recommender model using Spark's ALS algorithm
val model = ALS.trainImplicit(
ratings = trainData,
rank = rank,
iterations = numIterations,
lambda = lambda,
alpha = alpha
)
// Get the unique user IDs for the users in the test data and create a broadcast variable
val bTestData = sc.broadcast(testData.map(r => r.user).distinct().collect())
// Calculate the AUC in parallel
val aucSum = sc.parallelize(bTestData.value)
.map { user =>
val actualArtists = bUserArtistMap.value(user)
val recModelRecommendations = model.recommendProducts(user, 100)
val recModelPredictionsAndLabels = recModelRecommendations.map {
case Rating(_, artist, rating) =>
if (actualArtists.contains(artist)) (rating, 1.0)
else (rating, 0.0)
}
val recModelMetrics = new BinaryClassificationMetrics(
sc.parallelize(recModelPredictionsAndLabels)
)
recModelMetrics.areaUnderROC
}
.reduce(_ + _)
// Calculate the final average AUC
aucSum / bTestData.value.length
}
导致以下错误
This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.