我有一个 s3 桶,里面有一些 10M 的对象,所有这些对象都需要处理并发送到 opensearch。为此,我正在评估 kenisis 是否可以用于此目的。
网上的解决方案似乎暗示使用 lambda,但由于我有 10M 个对象,我认为该函数将在 for 循环耗尽时超时。
所以,我想要的设置是:
s3 --> (some producer) --> kenesis data streams --> opensearch (destination)
请问解决这个问题的最佳方法是什么
这里有一篇关于该主题的官方博客文章,建议使用 DMS 服务将 S3 中的现有文件发送到 Kinesis。
至于使用 Lambda 的所有建议,这些建议适用于文件尚未在 S3 中的场景,并且每次将文件上传到 S3 时都会触发 Lambda,只处理该文件。没有人建议您使用 Lambda 在单个函数调用中处理 10M 现有 S3 文件。
如果您想在当前场景中使用 Lambda,您可以先创建所有 S3 对象的列表,然后编写一个一次性脚本,将该对象列表提供给 SQS 队列。然后,您可以拥有一个 Lambda 函数,随着时间的推移处理队列中的消息,从队列中获取对象键,从 S3 读取文件,并将其发送到 Kinesis。 Lambda 一次最多可以分批处理 10 个。然后,您可以将 S3 存储桶配置为将新对象通知发送到同一个 SQS 队列,Lambda 将自动处理您稍后添加到存储桶中的任何新对象。
Mark B 的回答绝对是一个可行的选择,我建议配置您的 SQS 队列以针对每条消息触发 Lambda。
除非您需要 Kinesis 来实现某些 ETL 功能,否则您很可能可以直接从 S3 转到 OpenSearch。
假设 S3 中的文档格式适合 OpenSearch,我将采用以下方法之一:
使用适用于 Pandas 的 AWS 开发工具包,您可能会像这样实现您正在寻找的东西...
import awswrangler as wr
from opensearchpy import OpenSearch
items = wr.s3.read_json(path="s3://my-bucket/my-folder/")
# connect + upload to OpenSearch
my_client = OpenSearch(...)
wr.opensearch.index_df(client=my_client, df=items)
适用于 Pandas 的 AWS SDK 可以迭代 S3 项目块,并且有一个关于将 JSON(和其他文件类型)从 S3 索引到 OpenSearch 的教程。