使用 Pandas 和 PyMongo 将 MongoDB 数据加载到 DataFrame 的更好方法?

问题描述 投票:0回答:4

我有一个 0.7 GB 的 MongoDB 数据库,其中包含我试图加载到数据帧中的推文。但是,我收到错误。

MemoryError:    

我的代码如下所示:

cursor = tweets.find() #Where tweets is my collection
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)

我已经尝试了以下答案中的方法,这些方法在加载数据库之前会在某个时候创建数据库所有元素的列表。

然而,在另一个谈论 list() 的答案中,该人说这对于小数据集有好处,因为所有内容都加载到内存中。

就我而言,我认为这是错误的根源。数据太多,无法加载到内存中。我还可以使用什么其他方法?

python pandas pymongo
4个回答
11
投票

我已将代码修改为以下内容:

cursor = tweets.find(fields=['id'])
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)

通过在 find() 函数中添加 fields 参数,我限制了输出。这意味着我不会将每个字段加载到 DataFrame 中,而只会将选定的字段加载到 DataFrame 中。现在一切正常。


5
投票

从 mongodb 查询创建 DataFrame 的最快且可能是最节省内存的方法(如您的情况)将使用 monary

这篇文章有一个很好且简洁的解释。


3
投票

from_records
classmethod
可能是最好的方法:

from pandas import pd
import pymongo

client = pymongo.MongoClient()
data = db.mydb.mycollection.find() # or db.mydb.mycollection.aggregate(pipeline)

df = pd.DataFrame.from_records(data)

2
投票

一种优雅的做法如下:

import pandas as pd
def my_transform_logic(x):
    if x :
        do_something
        return result

def process(cursor):
    df = pd.DataFrame(list(cursor))
    df['result_col'] = df['col_to_be_processed'].apply(lambda value: my_transform_logic(value))

    #making list off dictionaries
    db.collection_name.insert_many(final_df.to_dict('records'))

    # or update
    db.collection_name.update_many(final_df.to_dict('records'),upsert=True)


#make a list of cursors.. you can read the parallel_scan api of pymongo

cursors = mongo_collection.parallel_scan(6)
for cursor in cursors:
    process(cursor)

我在上面的代码中使用 Joblib 在具有 260 万条记录的 mongoDB 集合上尝试了上述过程。我的代码没有抛出任何内存错误 2小时内处理完毕。

© www.soinside.com 2019 - 2024. All rights reserved.