我有一个 0.7 GB 的 MongoDB 数据库,其中包含我试图加载到数据帧中的推文。但是,我收到错误。
MemoryError:
我的代码如下所示:
cursor = tweets.find() #Where tweets is my collection
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)
我已经尝试了以下答案中的方法,这些方法在加载数据库之前会在某个时候创建数据库所有元素的列表。
然而,在另一个谈论 list() 的答案中,该人说这对于小数据集有好处,因为所有内容都加载到内存中。
就我而言,我认为这是错误的根源。数据太多,无法加载到内存中。我还可以使用什么其他方法?
我已将代码修改为以下内容:
cursor = tweets.find(fields=['id'])
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)
通过在 find() 函数中添加 fields 参数,我限制了输出。这意味着我不会将每个字段加载到 DataFrame 中,而只会将选定的字段加载到 DataFrame 中。现在一切正常。
from_records
classmethod
可能是最好的方法:
from pandas import pd
import pymongo
client = pymongo.MongoClient()
data = db.mydb.mycollection.find() # or db.mydb.mycollection.aggregate(pipeline)
df = pd.DataFrame.from_records(data)
一种优雅的做法如下:
import pandas as pd
def my_transform_logic(x):
if x :
do_something
return result
def process(cursor):
df = pd.DataFrame(list(cursor))
df['result_col'] = df['col_to_be_processed'].apply(lambda value: my_transform_logic(value))
#making list off dictionaries
db.collection_name.insert_many(final_df.to_dict('records'))
# or update
db.collection_name.update_many(final_df.to_dict('records'),upsert=True)
#make a list of cursors.. you can read the parallel_scan api of pymongo
cursors = mongo_collection.parallel_scan(6)
for cursor in cursors:
process(cursor)
我在上面的代码中使用 Joblib 在具有 260 万条记录的 mongoDB 集合上尝试了上述过程。我的代码没有抛出任何内存错误 2小时内处理完毕。