我正在研究 Python 和 MongoDB 变更流。我正在使用 PyMongo 库。我写了以下代码:
change_stream = self.client.watch(pipeline=pipeline, full_document='updateLookup', batch_size=100_000, resume_after=resume_token)
logging.debug('.. debugging code ..')
for document in change_stream:
resume_token = change_stream.resume_token
logging.debug(resume_token)
logging.debug(document['operationType'])
问题是,如果在 MongoDB 中一次插入多个文档,当我迭代更改流时,我会在代码中一一获取文档。有没有什么方法可以让我仅在一次迭代中从变更流中获取多个文档,以便我可以一次处理多个文档,而不是逐个处理它们?
在 MongoDB 中,文档是一一传递的,没有内置函数可以做到这一点!
如果您想这样做,您可以更改代码以从 Change Stream 获取一批文档,然后像这样处理它们:
change_stream = self.client.watch(pipeline=pipeline, full_document='updateLookup', batch_size=100_000, resume_after=resume_token)
batch_size = 10 # Set your desired batch size here
batch = []
for document in change_stream:
resume_token = change_stream.resume_token
logging.debug(resume_token)
logging.debug(document['operationType'])
batch.append(document)
# If the batch size is reached, process the batch and reset it
if len(batch) >= batch_size:
process_batch(batch)
batch = []
# Process any remaining documents in the last batch
if batch:
process_batch(batch)