为什么不使用Lambda处理程序在“ GetRecords.IteratorAgeMilliseconds”度量标准中显示Kinesis记录处理延迟

问题描述 投票:1回答:1

我正在尝试Kinesis和Lambda。

即使明显延迟,我也看不到Kinesis的“ GetRecords.IteratorAge”度量标准的延迟。

实验环境如下。

  • Kinesis数据流:1个流,由1个分片组成,而没有增强的扇出。
  • 生产者:它在本地PC上运行以下producer.rb。它每秒记录一次。
  • Consumer:以下lambda_handler.rb在Lambda中执行。它只是将带有时间戳的记录放入DynamoDB表,并在每条记录上休眠3秒。
  • 触发设置:
    • 批量大小:50
    • 批处理窗口:无
    • 每个分块的并行批次:1
    • 最后处理结果:未处理记录
    • 最长记录年龄:604800
    • 重试次数:10000
    • 错误时分批处理:否

producer.rb

require 'aws-sdk'

kinesis = Aws::Kinesis::Client.new(region: 'ap-northeast-1')

COUNT = 300
STREAM_NAME = 'test_stream'
PKEY = 'client-001'

COUNT.times do |i|
  kinesis.put_record(
    stream_name: STREAM_NAME,
    data: (i+1).to_s,
    partition_key: PKEY
  )
  sleep 1
end

lambda_handler.rb

require 'json'
require 'aws-sdk'
require 'base64'

def lambda_handler(event:, context:)
  dynamoDB = Aws::DynamoDB::Resource.new(region: 'ap-northeast-1')
  table = dynamoDB.table(ENV['DYNAMODB_TABLE'])
  item = {
    'aws_request_id' => context.aws_request_id,
    'start' => Time.now.to_s
  }
  event['Records'].each do { sleep 3 }
  item['end'] = Time.now.to_s
  table.put_item({item: item})
  { statusCode: 200 }
end

结果在DynamoDB中看起来像这样,在Cloudwatch中的指标看起来像这样:

它处理了04:09:03和04:24:04之间的记录。为什么即使记录处理没有进行,“ GetRecords.IteratorAge”也不会增加?

enter image description here

enter image description here

ruby aws-lambda amazon-kinesis
1个回答
0
投票

此问题已自行解决。

https://youtu.be/xmacMfbrG28

此视频详细介绍了Lambda的流源处理的内部结构。

“ Poller”订阅分片并通过GetRecords从分片迭代器中获取记录,然后“ Poller”调用前端函数并传递其记录。因此,即使Lambda函数被延迟,GetRecords也不会延迟。

© www.soinside.com 2019 - 2024. All rights reserved.