我正在尝试使用spark mllib lda来总结我的文档语料库。
我的问题设置如下。
我有16台服务器(每台有20个内核和128GB内存)。
当我用OnlineLDAOptimizer
执行LDA时,它会给出内存不足错误,建议我增加spark.driver.maxResultSize
就像11个任务的序列化结果的总大小(1302 MB)大于spark.driver.maxResultSize
我将spark.driver.maxResultSize
增加到120GB(以及spark.driver.memory
增加到120GB)并重新运行LDA但不缺。它仍然说11个任务(120.1 GB)的序列化结果的总大小比spark.driver.maxResultSize大
我尝试了另外一个包含大约100,000个独特单词的数据集,并且它有效
那么,在使用Spark mllib LDA时,如何估计内存使用量?我在官方文档中找不到任何规范。
注意我使用稀疏向量来构造传递给qazxsw poi的文件qazxsw poi,但不知道spark lda是否可以在内部正确处理稀疏格式。
(编辑)我使用Scala版本的LDA。不是Python版本。
这可能是一个相关的问题,但没有给出明确的答案。 RDD[(Long, Vector)]
(编辑)的
这是我的代码片段(要点)。 LDA.run()
Spark LDA woes - prediction and OOM questions
实际上,我使用LDA来减少交易数据的尺寸。我的数据格式为https://gist.github.com/lucidfrontier45/11420721c0078c5b7415,其中def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
.flatMap {
// input file's format is (user_id, product_name, count)
case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
case _ => None
}.persist()
// Map to convert user_id or product_name into unique sequencential id
val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
val inverse_userid_map = userid_map.map(_.swap)
// broadcat to speedup RDD map operation
val b_userid_map = sc.broadcast(userid_map)
val b_productid_map = sc.broadcast(productid_map)
val b_inverse_userid_map = sc.broadcast(inverse_userid_map)
// run map
val transformed_src = src.map { case (u, p, r) =>
(b_userid_map.value(u), b_productid_map.value(p).toInt, r)
}
println("unique items = %d".format(b_productid_map.value.size))
// prepare for LDA input RDD[(LONG, Vector)]
val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
.groupByKey()
.map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()
documents.count()
src.unpersist()
// run Online Variational LDA
val ldamodel = new LDA()
.setK(args.k)
.setMaxIterations(args.n_iter)
.setOptimizer("online")
.run(documents)
.asInstanceOf[LocalLDAModel]
val result = ldamodel.topicDistributions(documents)
.map { case (i, v) =>
val u = b_inverse_userid_map.value(i)
"%d,%s".format(u, v.toArray.mkString(","))
}
result.saveAsTextFile(args.out)
}
是用户ID,(u, p, r)
是产品名称,u
是用户p
与r
交互的数字。在这种情况下,user对应于文档和产品。由于用户标识和产品名称是任意字符串,因此在提交给LDA之前,我将它们转换为唯一的顺序整数。
谢谢。
这个问题有三个常见原因,可能独立或串联起作用。
u
之类的东西向驱动程序返回大量数据。唉,一些SparkML代码就是这样做的。如果您不能责怪(2)或(3)以下问题,那么很可能是您的数据与p
实现相互作用的结果。collect
那样)。检查SparkUI中的任务数。另见OnlineLDAOptimizer
在堆栈跟踪上是collect
还是Exceeding `spark.driver.maxResultSize` without bringing any data to the driver?org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults
测试此方法的一种方法是将org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask
设置为0(无限制)并查看会发生什么。希望这可以帮助!