使用PutRecords将多个记录加载到Kinesis-如何在失败的情况下仅重新发送失败的记录?

问题描述 投票:0回答:1

我正在使用Lambda将数据记录加载到Kinesis中,并且通常希望添加多达500K条记录,我将它们分成500个大块,并使用Boto的put_records方法将其发送到Kinesis。有时会由于超出允许的吞吐量而看到失败。

发生这种情况时重试的最佳方法是什么?理想情况下,我不希望数据流中出现重复的消息,因此,我不想简单地重新发送所有500条记录,但是我在努力查看如何仅重试失败的消息。 put_records方法的响应似乎不太有用。

我能否依靠响应记录列表的顺序与传递给putRecords的列表的顺序相同?

我知道我可以增加分片的数量,但我想显着增加向该Kinesis流加载数据的并行Lambda函数的数量。我们计划基于源系统对数据进行分区,但我不能保证多个功能不会将数据写入同一分片并且不会超过允许的吞吐量。结果,我不认为增加分片将消除对重试策略的需求。

或者,有人知道KPL是否会为我自动处理此问题吗?

amazon-web-services aws-lambda amazon-kinesis amazon-kinesis-kpl
1个回答
0
投票

我能否依靠响应记录列表的顺序与传递给putRecords的列表的顺序相同?

是。您将不得不依靠响应的顺序。响应记录的顺序与请求记录的顺序相同。

请检查putrecords响应,https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html

记录:一组成功和未成功处理的记录结果,通过自然顺序与请求相关。成功添加到流中的记录在结果中包含SequenceNumber和ShardId。未能添加到流的记录在结果中包含ErrorCode和ErrorMessage。

要重试失败的记录,您必须开发自己的重试机制。我用递归函数在python中使用以下方式编写了重试机制,并在重试之间使用了增量等待。

import boto3
import time

kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"

def send_to_stream(kinesis_records, retry_count):
    put_response = kinesis_client.put_records(
        Records=kinesis_records,
        StreamName=KINESIS_STREAM_NAME
    )
    failed_count = put_response['FailedRecordCount']
    if failed_count > 0:
        if retry_count > 0:
            retry_kinesis_records = []
            for idx, record in enumerate(put_response['Records']):
                if 'ErrorCode' in record:
                    retry_kinesis_records.append(kinesis_records[idx])
            time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
            send_to_stream(retry_kinesis_records, retry_count - 1)
        else:
            print(f'Not able to put records after retries. Records = {put_response["Records"]}')

在上述示例中,您可以根据需要更改KINESIS_RETRY_COUNTKINESIS_RETRY_WAIT_IN_SEC。另外,您还必须确保lambda超时足以重试。

或者,有人知道KPL是否会自动处理此问题吗?对我来说问题?

我不确定KPL,但是从文档看来,它具有自己的重试机制。 https://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-retries-rate-limiting.html

© www.soinside.com 2019 - 2024. All rights reserved.