我正在尝试Kinesis和Lambda。
即使明显延迟,我也看不到Kinesis的“ GetRecords.IteratorAge”度量标准的延迟。
实验环境如下。
producer.rb
。它每秒记录一次。lambda_handler.rb
在Lambda中执行。它只是将带有时间戳的记录放入DynamoDB表,并在每条记录上休眠3秒。producer.rb
require 'aws-sdk'
kinesis = Aws::Kinesis::Client.new(region: 'ap-northeast-1')
COUNT = 300
STREAM_NAME = 'test_stream'
PKEY = 'client-001'
COUNT.times do |i|
kinesis.put_record(
stream_name: STREAM_NAME,
data: (i+1).to_s,
partition_key: PKEY
)
sleep 1
end
lambda_handler.rb
require 'json'
require 'aws-sdk'
require 'base64'
def lambda_handler(event:, context:)
dynamoDB = Aws::DynamoDB::Resource.new(region: 'ap-northeast-1')
table = dynamoDB.table(ENV['DYNAMODB_TABLE'])
item = {
'aws_request_id' => context.aws_request_id,
'start' => Time.now.to_s
}
event['Records'].each do { sleep 3 }
item['end'] = Time.now.to_s
table.put_item({item: item})
{ statusCode: 200 }
end
结果在DynamoDB中看起来像这样,在Cloudwatch中的指标看起来像这样:
它处理了04:09:03和04:24:04之间的记录。为什么即使记录处理没有进行,“ GetRecords.IteratorAge”也不会增加?
此问题已自行解决。
此视频详细介绍了Lambda的流源处理的内部结构。
“ Poller”订阅分片并通过GetRecords从分片迭代器中获取记录,然后“ Poller”调用前端函数并传递其记录。因此,即使Lambda函数被延迟,GetRecords也不会延迟。