我如何从AWS Kinesis Data Stream事件访问数据?

问题描述 投票:2回答:1

我正在使用消耗AWS Kinesis Data Stream的Python lambda。但是我在努力理解运动学记录事件的形式。例如:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

来源: Using AWS Lambda with Amazon Kinesis

我最初在此对象表示的运动学流上放置的数据在哪里?以及如何访问这些数据?

python python-3.x amazon-web-services aws-lambda amazon-kinesis
1个回答
4
投票

您放在流上的数据在每个记录的kinesis.data键上表示为Base64编码的字符串。例如(截断):

{
    "Records": [
        {
            "kinesis": {
                ...
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                ...
            },
            ...
        },
        {
            "kinesis": {
                ...
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                ...
            },
            ...
        }
    ]
}

要访问数据,请遍历每个Records对象,然后Base64解码kinesis.data值。

import base64


for record in event["Records"]:
    decoded_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")

    print(decoded_data)
    # Record 1: Hello, this is a test.
    # Record 2: This is only a test.

[注:此示例假定发送到运动流的数据最初是在运动b64对其进行编码之前utf-8进行编码的。

© www.soinside.com 2019 - 2024. All rights reserved.