更新:如果您阅读此文件以提高插入/更新速度,请先从python控制台运行
pymongo.has_c()
,以检查系统上是否启用了pymongo C扩展。如果解析为False
,则需要使用C扩展名编译pymongo或执行pip install --upgrade pymongo
将我的工作流程从10K行上的17秒改进到大约0.57秒。
我有成千上万个txt文件,其中包含我要导入到mongodb集合中的数百万行数据。
我当前正在使用以下def:
import re, pymongo
coll = pymongo.MongoClient().database.collection
rx = re.compile(r'[:; ]')
rx_email = re.compile(r'\S+@\S+\.\S+$')
def parser(path):
with open(path, "rb") as f:
for line in f:
try:
fields = rx.split(line.decode('utf-8'))
email = ''
username = ''
for field in fields:
if rx_email.match(field):
email = field
elif field != fields[-1]:
username = field
password = fields[-1]
if email:
coll.find_one_and_update({'email': email}, {'$addToSet': {'passwords': password}}, upsert=True)
elif username:
coll.find_one_and_update({'username': username}, {'$addToSet': {'passwords': password}}, upsert=True)
else:
pass
except UnicodeDecodeError:
pass
if __name__ == "__main__":
parser('path/to/file.txt')
[当我尝试在具有10K行的文件上运行脚本时,花了74.58974479999999秒。我认为这是由于插入时MongoDB必须匹配的项目数量导致的?在没有数据库交互的情况下运行相同的循环花费了0.022998秒。
EDIT:如Fast or Bulk Upsert in pymongo中的建议,我还尝试如下使用UpdateOne
和bulk_write
:
def parser(path):
ops = []
with open(path, "rb") as f:
for line in f:
if (len(ops) == 1000):
LOCAL_DB.bulk_write(ops, ordered=False)
ops = []
try:
fields = rx.split(line.decode('utf-8'))
email = ''
username = ''
for field in fields:
if rx_email.match(field):
email = field
elif field != fields[-1]:
username = field
password = fields[-1]
if email:
pass
ops.append((UpdateOne({'identifier': email}, {'$addToSet': {'passwords': password}}, upsert=True)))
elif username:
pass
ops.append((UpdateOne({'identifier': username}, {'$addToSet': {'passwords': password}}, upsert=True)))
else:
pass
except UnicodeDecodeError:
pass
完成10K行的时间为17秒,但这是减慢我尝试更新的文件和行的数量的方法。
有没有更好(并且希望更快)的方法?
某些要求:
更新:如果您阅读此书以提高插入/更新速度,请先通过从python控制台运行pymongo.has_c()来检查系统上是否启用了pymongo C扩展。如果解析为...
[我似乎在@JohnnyHK的一些指导下设法通过在初始代码中执行以下操作,将我的初始upsert
时间从10分钟的约74秒缩短为〜0.5秒,从我的初始代码: