我有一个java应用程序需要从MongoDB 3.2读取大量数据并将其传输到Hadoop。
该批次应用程序每4小时运行6次,每天6次。
数据规格:
目前我正在使用MongoTemplate和Morphia来访问MongoDB。但是,在使用以下内容处理此数据时,我收到OOM异常:
List<MYClass> datalist = datasource.getCollection("mycollection").find().asList();
读取此数据并填充到Hadoop的最佳方法是什么?
MongoTemplate::Stream()
一个接一个地写信给Hadoop?batchSize(someLimit)
并将整个批次写入Hadoop?Cursor.batch()
并逐一写入hdfs?你的问题在于asList()
电话
这会强制驱动程序遍历整个游标(80,000个文档,几个Gigs),将所有内容保留在内存中。
无论批量大小如何,batchSize(someLimit)
和Cursor.batch()
都无法帮助您遍历整个光标。
相反,你可以:
1)迭代光标:List<MYClass> datalist = datasource.getCollection("mycollection").find()
2)一次读取一个文档并将文档送入缓冲区(比如列表)
3)对于每1000个文件(比如说)调用Hadoop API,清除缓冲区,然后重新开始。
asList()
调用将尝试将整个Mongodb集合加载到内存中。试图使内存列表对象大于3GB的大小。
使用游标迭代集合将解决此问题。您可以使用Datasource类执行此操作,但我更喜欢Morphia为DAO类提供的类型安全抽象:
class Dao extends BasicDAO<Order, String> {
Dao(Datastore ds) {
super(Order.class, ds);
}
}
Datastore ds = morphia.createDatastore(mongoClient, DB_NAME);
Dao dao = new Dao(ds);
Iterator<> iterator = dao.find().fetch();
while (iterator.hasNext()) {
Order order = iterator.next;
hadoopStrategy.add(order);
}