我正在使用Lambda将数据记录加载到Kinesis中,并且通常希望添加多达500K条记录,我将它们分成500个大块,并使用Boto的put_records
方法将其发送到Kinesis。有时会由于超出允许的吞吐量而看到失败。
发生这种情况时重试的最佳方法是什么?理想情况下,我不希望数据流中出现重复的消息,因此,我不想简单地重新发送所有500条记录,但是我在努力查看如何仅重试失败的消息。 put_records
方法的响应似乎不太有用。
我能否依靠响应记录列表的顺序与传递给putRecords的列表的顺序相同?
我知道我可以增加分片的数量,但我想显着增加向该Kinesis流加载数据的并行Lambda函数的数量。我们计划基于源系统对数据进行分区,但我不能保证多个功能不会将数据写入同一分片并且不会超过允许的吞吐量。结果,我不认为增加分片将消除对重试策略的需求。
或者,有人知道KPL是否会为我自动处理此问题吗?
我能否依靠响应记录列表的顺序与传递给putRecords的列表的顺序相同?
是。您将不得不依靠响应的顺序。响应记录的顺序与请求记录的顺序相同。
请检查putrecords
响应,https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html。
记录:一组成功和未成功处理的记录结果,通过自然顺序与请求相关。成功添加到流中的记录在结果中包含SequenceNumber和ShardId。未能添加到流的记录在结果中包含ErrorCode和ErrorMessage。
要重试失败的记录,您必须开发自己的重试机制。我用递归函数在python中使用以下方式编写了重试机制,并在重试之间使用了增量等待。
import boto3
import time
kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"
def send_to_stream(kinesis_records, retry_count):
put_response = kinesis_client.put_records(
Records=kinesis_records,
StreamName=KINESIS_STREAM_NAME
)
failed_count = put_response['FailedRecordCount']
if failed_count > 0:
if retry_count > 0:
retry_kinesis_records = []
for idx, record in enumerate(put_response['Records']):
if 'ErrorCode' in record:
retry_kinesis_records.append(kinesis_records[idx])
time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
send_to_stream(retry_kinesis_records, retry_count - 1)
else:
print(f'Not able to put records after retries. Records = {put_response["Records"]}')
在上述示例中,您可以根据需要更改KINESIS_RETRY_COUNT
和KINESIS_RETRY_WAIT_IN_SEC
。另外,您还必须确保lambda超时足以重试。
或者,有人知道KPL是否会自动处理此问题吗?对我来说问题?
我不确定KPL,但是从文档看来,它具有自己的重试机制。 https://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-retries-rate-limiting.html