AWS Kinesis Firehose + Lambda:如何处理同时发生的多个记录创建/更新

问题描述 投票:0回答:1

作为 Kinesis Firehose 工作流程的一部分,我有一个链接到我的 Kinesis 数据流的 Lambda 函数。 lambda 函数真正做的就是获取来自 DynamoDB 的序列化记录,并对它们进行反序列化(以及通常的 S3 编码/解码内容):

def lambda_handler(event, context):
    deserializer = TypeDeserializer()
    output = []
    try:
        for record in event["records"]:
            # Everything stored in S3 is base64 encoded, so we must first decode, deserialize, and
            # then encode the records again before we send them to S3
            decoded_payload = json.loads(base64.b64decode(record["data"]).decode())

            if decoded_payload["dynamodb"] and decoded_payload["dynamodb"]["NewImage"]:
                updated_record = decoded_payload["dynamodb"]["NewImage"]
                deserialized_record = {
                    k: deserializer.deserialize(v) for k, v in updated_record.items()
                }

                # Add newline after each record??? (otherwise Athena will only "see" the first?)
                encoded_record_with_line_break = (json.dumps(deserialized_record, cls=DecimalEncoder) + "\n").encode()
                output_record = {
                    "recordId": record["recordId"],
                    "result": "Ok",
                    "data": base64.b64encode(encoded_record_with_line_break).decode(),
                }
            else:
                print(
                    f"Dropping payload containing no updated DynamoDB image: {decoded_payload}"
                )
                output_record = {
                    "recordId": record["recordId"],
                    "result": "Dropped",
                    "data": record["data"],
                }

            output.append(output_record)

            print(f"Successfully processed {len(event['records'])} records.")
        except Exception as exc:
            print(f"Unhandled exception raised during data transformation: {exc}")
        finally:
            return {"records": output}

您可能已经注意到,我添加了 ' ' 在我对其进行编码并将其附加到输出数组之前添加到记录中。

这是因为我遇到了一个问题,我的 Glue Crawlers 没有注册我的 lambda 输出到 S3 的所有记录更新,特别是当同时创建/更新多个记录时。

基本上,Kinesis Data Firehose 发送内联 JSON 记录;它们不以逗号或换行符分隔。在与 AWS 支持取得联系后,他们发现了一篇 AWS 博文是这样说的:

默认情况下,Kinesis Data Firehose 以内联方式发送 JSON 记录,这会导致 Athena 仅查询每个 S3 对象中的第一条记录。为了克服这个问题,我们使用 Lambda 函数通过添加行尾 (EOL) 字符来转换记录,然后再将它们发送到 Amazon S3。

链接:https://aws.amazon.com/blogs/big-data/build-seamless-data-streaming-pipelines-with-amazon-kinesis-data-streams-and-amazon-kinesis-data-firehose-对于-amazon-dynamodb-表/

所以我们添加了换行符,正如您在我上面的代码中看到的那样。不幸的是,这似乎并没有解决我的问题——通过添加换行符,我注意到我的 lambda Cloudwatch 日志似乎在记录重复项。对单个记录的每次更新都会导致创建 2 个文件,而不仅仅是 1 个,并且由于数据格式无效 (HIVE_BAD_DATA),在 Athena 中查询这些记录将失败。

处理这种情况的正确方法是什么?

amazon-s3 aws-lambda amazon-dynamodb amazon-kinesis amazon-kinesis-firehose
1个回答
0
投票

我认为我的问题实际上源于我的原始函数中一些不正确的编码/解码逻辑。

你最终需要' ' 在每个处理过的记录之间,如我链接的文档中所述。

我还找到了一篇支持这个理论的文章,所以这是两个链接:

这是有效的 Python 代码:

decoded_payload = json.loads(base64.b64decode(record["data"]).decode())
if decoded_payload["dynamodb"] and decoded_payload["dynamodb"]["NewImage"]: 
    updated_record = decoded_payload["dynamodb"]["NewImage"]
    deserialized_record = {
        k: deserializer.deserialize(v) for k, v in updated_record.items()
    }
    # Add newline after each record, otherwise Athena will only "see" the first
    record_with_line_break = json.dumps(deserialized_record, cls=DecimalEncoder) + "\n"
    output_record = {
        "recordId": record["recordId"],
        "result": "Ok",
        "data": base64.b64encode(record_with_line_break.encode()),
    }
    output.append(output_record)
© www.soinside.com 2019 - 2024. All rights reserved.