我有一个mongo db,其中包含一个名为“ ActiveTracking”的集合,其自定义键为“ dates”。定期,我会批量获取新文档,其中可能包含重复的“日期”和新的“日期”。
我的记录字典看起来像这样:
dicto = [{'_id': Timestamp('2004-02-25 00:00:00'),
'low': 2.809999942779541,
'volume': 12800,
'open': 2.9000000953674316,
'high': 2.9000000953674316,
'close': 2.819999933242798,
'adjclose': 1.5342552661895752,
'dividends': 0.0},
{'_id': Timestamp('2004-02-26 00:00:00'),
'low': 2.819999933242798,
'volume': 59500,
'open': 2.8499999046325684,
'high': 2.9000000953674316,
'close': 2.890000104904175,
'adjclose': 1.572339653968811,
'dividends': 0.0},]
例如,第一条记录在数据库中,第二条记录不在数据库中。如果我这样做:
collection = db["STOCK"]
collection.insert_many(dicto, ordered=False)
返回
BulkWriteError: batch op errors occurred
我的收藏集中有成千上万的记录,我收到的“批量”中可能包含100(与示例中显示的2相对)。无论如何,是否只将唯一的ID批量写入数据库?
更新以下代码可能会起作用,但是在插入之前,我试图避免对要插入的字典进行迭代(以检查重复项)。我宁愿不要在冗长的列表上进行迭代的解决方案来区分要插入的内容,因为这可能会很耗时。
to_be_inserted = []
for d in dicto:
x = collection.find_one(d)
if type(x) != dict:
to_be_inserted.append(d)
collection.insert_many(to_be_inserted)
以下伪代码应该起作用。使用find_one
检查记录是否存在,如果不存在该记录,则将其添加到to_be_inserted
列表中。插最后全部批处理。
由于_id
始终默认为索引,因此您将在find_one
上获得非常快的性能。
如果您知道时间戳的某些属性,则可以进一步优化通过跟踪最旧和最新的时间戳并查看传入的时间戳是否为在该范围之内或之外。
to_be_inserted = []
for d in candidate_records:
if col.find_one(d["_id"]):
continue
else:
to_be_inserted.append(d)
if len(d) > 0:
col.insert_many(to_be_inserted)
to_be_inserted = []
这被称为unordered bulk write。