Spark LDA消耗太多内存

问题描述 投票:12回答:1

我正在尝试使用spark mllib lda来总结我的文档语料库。

我的问题设置如下。

  • 大约100,000份文件
  • 大约400,000个独特的单词
  • 100集群

我有16台服务器(每台有20个内核和128GB内存)。

当我用OnlineLDAOptimizer执行LDA时,它会给出内存不足错误,建议我增加spark.driver.maxResultSize就像11个任务的序列化结果的总大小(1302 MB)大于spark.driver.maxResultSize

我将spark.driver.maxResultSize增加到120GB(以及spark.driver.memory增加到120GB)并重新运行LDA但不缺。它仍然说11个任务(120.1 GB)的序列化结果的总大小比spark.driver.maxResultSize大

我尝试了另外一个包含大约100,000个独特单词的数据集,并且它有效

那么,在使用Spark mllib LDA时,如何估计内存使用量?我在官方文档中找不到任何规范。

注意我使用稀疏向量来构造传递给qazxsw poi的文件qazxsw poi,但不知道spark lda是否可以在内部正确处理稀疏格式。

(编辑)我使用Scala版本的LDA。不是Python版本。

这可能是一个相关的问题,但没有给出明确的答案。 RDD[(Long, Vector)]

(编辑)的

这是我的代码片段(要点)。 LDA.run()

Spark LDA woes - prediction and OOM questions

实际上,我使用LDA来减少交易数据的尺寸。我的数据格式为https://gist.github.com/lucidfrontier45/11420721c0078c5b7415,其中def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = { val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t")) .flatMap { // input file's format is (user_id, product_name, count) case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble)) case _ => None }.persist() // Map to convert user_id or product_name into unique sequencential id val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap val inverse_userid_map = userid_map.map(_.swap) // broadcat to speedup RDD map operation val b_userid_map = sc.broadcast(userid_map) val b_productid_map = sc.broadcast(productid_map) val b_inverse_userid_map = sc.broadcast(inverse_userid_map) // run map val transformed_src = src.map { case (u, p, r) => (b_userid_map.value(u), b_productid_map.value(p).toInt, r) } println("unique items = %d".format(b_productid_map.value.size)) // prepare for LDA input RDD[(LONG, Vector)] val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) } .groupByKey() .map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist() documents.count() src.unpersist() // run Online Variational LDA val ldamodel = new LDA() .setK(args.k) .setMaxIterations(args.n_iter) .setOptimizer("online") .run(documents) .asInstanceOf[LocalLDAModel] val result = ldamodel.topicDistributions(documents) .map { case (i, v) => val u = b_inverse_userid_map.value(i) "%d,%s".format(u, v.toArray.mkString(",")) } result.saveAsTextFile(args.out) } 是用户ID,(u, p, r)是产品名称,u是用户pr交互的数字。在这种情况下,user对应于文档和产品。由于用户标识和产品名称是任意字符串,因此在提交给LDA之前,我将它们转换为唯一的顺序整数。

谢谢。

apache-spark apache-spark-mllib lda
1个回答
0
投票

这个问题有三个常见原因,可能独立或串联起作用。

  1. 该作业使用u之类的东西向驱动程序返回大量数据。唉,一些SparkML代码就是这样做的。如果您不能责怪(2)或(3)以下问题,那么很可能是您的数据与p实现相互作用的结果。
  2. 这项工作涉及大量任务,每项任务都会将结果作为Spark职位管理的一部分返回给司机(而不是像collect那样)。检查SparkUI中的任务数。另见OnlineLDAOptimizer在堆栈跟踪上是collect还是Exceeding `spark.driver.maxResultSize` without bringing any data to the driver
  3. 估计错误:Spark显着高估了即将返回给驱动程序的数据大小,并抛出此错误以防止群集的驱动程序来自OOM。请参阅org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults测试此方法的一种方法是将org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask设置为0(无限制)并查看会发生什么。

希望这可以帮助!

© www.soinside.com 2019 - 2024. All rights reserved.