如何调整spark应用程序以避免OOM异常

问题描述 投票:1回答:1

我使用Spark 2.0.2。

我正在尝试运行一个对已经创建的模型进行预测的spark应用程序。

群集信息:m4.2xlarge 16个vCPU,32个GiB内存,仅EBS存储EBS存储:1000 GiB

根据here的建议,我制作了一个Google-Spreadsheet来计算调整参数。

无论我尝试什么,我都会得到以下2个例外情况:

  1. 由于超过内存限制而被YARN杀死的容器。使用10.0 GB的10 GB物理内存。考虑提升spark.yarn.executor.memoryOverhead。
  2. 执行者心跳在159126 ms后超时

下面是我试图执行的代码

val allGears = sc.textFile(allGearsFilePath)
val allUsers = sc.textFile(allUsersFilePath)
val allUserItems = allUsers.cartesian(allGears).map{ case(x,y) => (x.toInt, y.toInt)}
allUserItems.cache()


val gearPurchased = sc.textFile(gearPurchaseRating)
val gearAddedToCart = sc.textFile(gearAddToCartRating)
val gearShoppingUserToItem = gearPurchased.map(_.split(',') match   { case Array(user, item, rate) => (user.toInt, item.toInt) })
gearShoppingUserToItem.cache()
val allUserItemToGearShoppingUnion = allUserItems.union(gearShoppingUserToItem)
val allUserItemToGearShoppingIntersection = allUserItems.intersection(gearShoppingUserToItem)
val FinalSubtraction = allUserItems.subtract(gearShoppingUserToItem)
val nonPurchasedGears = FinalSubtraction
nonPurchasedGears.cache()
allUserItems.unpersist()
gearShoppingUserToItem.unpersist()
val out = model.predict(nonPurchasedGears)

当我尝试预测用户可以购买的齿轮时,我得到了例外。

下面是我运行的spark-submit命令

spark-submit --jars jedis-2.7.2.jar,commons-pool2-2.3.jar,spark-redis-0.3.2.jar,SparkHBase.jar,recommendcontentslib_2.11-1.0.jar --class org.digitaljuice.itemrecommender.RecommendGears --master yarn --driver-memory 2g --num-executors 5 --executor-memory 9g --executor-cores 5 --conf spark.yarn.executor.memoryOverhead=1024 recommendersystem_2.11-0.0.1.jar /work/output/gearpurchaserating/part-00000 /work/output/gearaddtocartrating/part-00000 /work/output/allGears/part-00000 /work/output/allAccounts/part-00000 /work/allaccounts/acc_toacc/part-m-00000 /work/Recommendations/ /work/TrainingModel

如何调整应用程序以使其运行并进行预测?我尝试了各种各样的东西,但似乎没有任何工作,所以我猜我没有正确调整应用程序。请帮助。

谢谢

scala apache-spark rdd apache-spark-mllib
1个回答
2
投票

好的,所以我不会继续留在评论轨道中,而是直接进入一个有目的的解决方案,这也需要对代码进行一些清理。

val allGears = spark.read.csv(allGearsFilePath)
val allUsers = spark.read.csv(allUsersFilePath)
val allUserItems = allUsers.crossJoin(allGears).map{case Row(x: String,y: String) => (x.toInt, y.toInt)}.persist(StorageLevel.MEMORY_AND_DISK)

val gearPurchased = spark.read.csv(gearPurchaseRating)

val gearShoppingUserToItem = gearPurchased.map{case Row(x: String,y: String) => (x.toInt, y.toInt)}.persist(StorageLevel.MEMORY_AND_DISK)
val nonPurchasedGears = allUserItems.except(gearShoppingUserToItem).cache()

val gearAddedToCart = spark.read.csv(gearAddToCartRating) // NOT USED
val allUserItemToGearShoppingUnion = allUserItems.union(gearShoppingUserToItem) // NOT USED
val allUserItemToGearShoppingIntersection = allUserItems.intersect(gearShoppingUserToItem) // NOT USED

allUserItems.unpersist()
gearShoppingUserToItem.unpersist()

val out = model.predict(nonPurchasedGears)

在您的示例中有许多未使用的变量 - 我已将它们留在那里以防您以后需要它们。如果您不需要它们,请将它们删除。 (另外,如果你删除它们,没有理由缓存任何数据帧,你也可以简单地从代码中删除所有persistcache。)

无论如何,回到问题 - 如果你仍然遇到OOM,你可以尝试一些事情:

  • 增加memoryOverhead。在Spark 2.x中,关闭堆内存的使用量增加,通常需要增加memoryOverhead。尝试将其增加到4096(请注意,您可能需要降低--executor-memory,以便不超过可用内存)。
  • 在执行except之前,将数据重新分区为更多分区
  • 通过在调用allUserItems.count之前添加gearShoppingUserToItem.countexcept来强制评估和缓存持久化的数据帧。我知道这听起来很奇怪,但它会经常解决OOM问题,并且还会显着加快代码速度。

我希望这有帮助 :)

© www.soinside.com 2019 - 2024. All rights reserved.