通过 lambda 处理 documentdb 流期间丢失数据

问题描述 投票:0回答:1

我想介绍基于documentdb、opensearch和lambda(AWS云)的全文查询。我准备了一个小架构:

  1. Documentdb 集群,有两个实例,changeStreams 设置为 true
  2. 带有 python 脚本的 Lambda
  3. OpenSearch,我正在推送从 documentdb 流捕获的更改

我正在通过 mongoshell 将文档插入到我的集合中来测试数据流。我逐一插入文档(10 次),但第 5 个文档未插入到 OpenSearch 中。我还做了一些其他测试,我几乎可以肯定并非所有项目都在 OpenSearch 中编入索引。

编辑 - Labda 负责读取流并将数据推送到操作系统:

client = pymongo.MongoClient(host, port, username, password, ssl)

    db = client['my_db']
    coll = db.get_collection('my_collection', read_preference=pymongo.ReadPreference.PRIMARY)
    stream = coll.watch()

    try:
        change = stream.next() 
        if change is not None:
            document = change['fullDocument']
            myObject = document['myObject']

            headers = { "Content-Type": "application/json" }
            query = {
                'id': document['docId'],
                'name': myObject['docName']
            }
            url = 'url-to-openSearch-domain/my-index/_doc/' + document['docId']
 
            basic = HTTPBasicAuth(username, password)
            r = requests.put(url, auth=basic, headers, data=json.dumps(query))

我只会补充一点,我也测试了stream.try_next(),但它始终返回 None 。

我尝试了一些设置,但仍然存在数据丢失(lambda 未触发或其他问题)。 我的设置:

  • Lambda:1024MB 内存,超时 15 分钟,预配置并发 - 10,
  • 触发器:批量大小1000,完整文档配置:updateLookup

有人知道导致数据丢失的根本原因是什么吗?也许changeStream正在变得很快而lambda无法捕获它?

amazon-web-services elasticsearch aws-lambda full-text-search aws-documentdb
1个回答
-1
投票

我发现本教程很有用,并且可以通过减少 lambda 函数的编码来解决您的问题。 https://docs.aws.amazon.com/lambda/latest/dg/with-documentdb-tutorial.html#docdb-create-the-lambda-event-source-mapping

按照以下配置创建事件源映射:

DocumentDB 集群 – 选择您之前创建的集群。

数据库名称 – docdbdemo

系列名称 – 产品

批量大小 – 1

起始位置 – 最新

身份验证 – BASIC_AUTH

Secrets Manager 键 – 选择您刚刚创建的 DocumentDBSecret。

批处理窗口 – 1

完整文档配置 – UpdateLookup

不要忘记选中复选框上的激活。

配置是 Lambda 触发器观察 DocumentDB 上的数据库/集合上的最新数据更改并将事件发送到 lambda 函数的关键信息。

因此,您只需要处理事件并从事件对象中获取数据即可。 这是您可以尝试的示例。

    def lambda_handler(event, context):
        data=event['events'][0]['event']['fullDocument']
        print(data)
        return {
            'statusCode': 200,
            'body': data
        }
© www.soinside.com 2019 - 2024. All rights reserved.