我编写了一个函数来检查行是否存在,如果不存在则插入,如果存在则更新。所以我尝试使用
count_documents
。我使用一个名为 filetype
的字段作为我的唯一密钥。
示例文档如下所示:
{
"_id": ObjectId("652d47a64732d257ee31e846"),
"filetype": "USTreasuryBillRates",
"last_date_processed": "2023-10-16T00:00:00.000Z",
"update_date": "2023-10-16T10:13:45.000Z"
}
Python函数:
def upsert_load_history(arg_file_type, arg_date, arg_db_collection):
# insert if not there, otherwise update in place
print("===== upsert_load_history ======")
json_query = {"filetype": arg_file_type}
row_count = arg_db_collection.count_documents(json_query, {'limit': 1})
if row_count == 0:
insert_load_history_new_rows(arg_file_type, arg_date, arg_db_collection)
else:
update_load_history_for_file_type(arg_file_type, arg_date, arg_db_collection)
错误:
===== upsert_load_history ======
Traceback (most recent call last):
File "C:\GitHub\ClientName\Apps\LambdaUSTreasury\SetupLoadHistoryDB.py", line 134, in <module>
upsert_load_history(file_type, file_date, db_collection_filehistory)
File "C:\GitHub\ClientName\Apps\LambdaUSTreasury\SetupLoadHistoryDB.py", line 61, in upsert_load_history
row_count = arg_db_collection.count_documents(json_query, {'limit': 1})
File "C:\GitHub\ClientName\Apps\LambdaUSTreasury\pymongo\collection.py", line 1786, in count_documents
_cmd, self._read_preference_for(session), session)
File "C:\GitHub\ClientName\Apps\LambdaUSTreasury\pymongo\common.py", line 870, in _read_preference_for
return session._txn_read_preference() or self.__read_preference
AttributeError: 'dict' object has no attribute '_txn_read_preference'
根据 JonRSharpe 的评论,这是我更正的代码。 基本上将 {'limit':1} 更改为 limit=1。
def upsert_load_history(arg_file_type, arg_date, arg_db_collection):
# insert if not there, otherwise update in place
print("===== upsert_load_history ======")
update_date = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
json_query = {"filetype": arg_file_type}
row_count = arg_db_collection.count_documents(json_query, limit=1)
if row_count == 0:
insert_load_history_new_rows(arg_file_type, arg_date, arg_db_collection)
else:
update_load_history_for_file_type(arg_file_type, arg_date, arg_db_collection)
此外,根据 Belly Buster 的评论,我已经发现了“Upsert”功能。下面我编写了一个新方法来测试它,效果很好。
def update2_load_history_for_file_type(arg_file_type, arg_date, arg_db_collection):
# do update without retrieve first, using the upsert feature
print("===== update_load_history_for_file_type ======")
# Note, in my collection, filetype is my unique key
json_query = {'filetype': arg_file_type}
update_date = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
json_update = {
'$set': {
'last_date_processed': arg_date,
'update_date': update_date
}
}
print("json_query:")
pprint.pprint(json_query)
print("json_update:")
pprint.pprint(json_update)
result = arg_db_collection.update_one(json_query, json_update, upsert=True)
print("Update result.matched_count=", result.matched_count)