我想介绍基于documentdb、opensearch和lambda(AWS云)的全文查询。我准备了一个小架构:
我正在通过 mongoshell 将文档插入到我的集合中来测试数据流。我逐一插入文档(10 次),但第 5 个文档未插入到 OpenSearch 中。我还做了一些其他测试,我几乎可以肯定并非所有项目都在 OpenSearch 中编入索引。
编辑 - Labda 负责读取流并将数据推送到操作系统:
client = pymongo.MongoClient(host, port, username, password, ssl)
db = client['my_db']
coll = db.get_collection('my_collection', read_preference=pymongo.ReadPreference.PRIMARY)
stream = coll.watch()
try:
change = stream.next()
if change is not None:
document = change['fullDocument']
myObject = document['myObject']
headers = { "Content-Type": "application/json" }
query = {
'id': document['docId'],
'name': myObject['docName']
}
url = 'url-to-openSearch-domain/my-index/_doc/' + document['docId']
basic = HTTPBasicAuth(username, password)
r = requests.put(url, auth=basic, headers, data=json.dumps(query))
我只会补充一点,我也测试了stream.try_next(),但它始终返回 None 。
我尝试了一些设置,但仍然存在数据丢失(lambda 未触发或其他问题)。 我的设置:
有人知道导致数据丢失的根本原因是什么吗?也许changeStream正在变得很快而lambda无法捕获它?
我发现本教程很有用,并且可以通过减少 lambda 函数的编码来解决您的问题。 https://docs.aws.amazon.com/lambda/latest/dg/with-documentdb-tutorial.html#docdb-create-the-lambda-event-source-mapping
按照以下配置创建事件源映射:
DocumentDB 集群 – 选择您之前创建的集群。
数据库名称 – docdbdemo
系列名称 – 产品
批量大小 – 1
起始位置 – 最新
身份验证 – BASIC_AUTH
Secrets Manager 键 – 选择您刚刚创建的 DocumentDBSecret。
批处理窗口 – 1
完整文档配置 – UpdateLookup
不要忘记选中复选框上的激活。
配置是 Lambda 触发器观察 DocumentDB 上的数据库/集合上的最新数据更改并将事件发送到 lambda 函数的关键信息。
因此,您只需要处理事件并从事件对象中获取数据即可。 这是您可以尝试的示例。
def lambda_handler(event, context):
data=event['events'][0]['event']['fullDocument']
print(data)
return {
'statusCode': 200,
'body': data
}