Cloudwatch 订阅筛选器未摄取到 Kinesis Data Stream

问题描述 投票:0回答:1

我正在尝试使用订阅过滤器将 Cloudwatch 日志转发到 Kinesis Data Stream。
即使拥有广泛的权限,由于某种原因,我也无法在 Kinesis 中看到任何记录。 我可以在 Cloudwatch 日志组中看到日志,但不会转发到 Kinesis Data Stream。 lambda 是一个基本的,只有几个 console.log 语句。

这是堆栈的 terraform 代码。

resource "aws_iam_role" "sample_lambda_role" {
  name = "sample_lambda_role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_policy" "sample_lambda_policy" {

  name        = "sample_lambda_policy"
  path        = "/"
  description = "AWS IAM Policy for managing aws lambda role"
  policy      = <<EOF
{
 "Version": "2012-10-17",
 "Statement": [
   {
     "Action": [
       "logs:CreateLogGroup",
       "logs:CreateLogStream",
       "logs:PutLogEvents"
     ],
     "Resource": "arn:aws:logs:*:*:*",
     "Effect": "Allow"
   }
 ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "sample_lambda_attach_iam_policy_to_iam_role" {
  role       = aws_iam_role.sample_lambda_role.name
  policy_arn = aws_iam_policy.sample_lambda_policy.arn
}

data "archive_file" "lambda_app_zip" {
  type       = "zip"
  source_dir = "${path.module}/sample-lambda"
  #source_file = "index.js" #if one file
  output_path = "${path.module}/sample-lambda.zip"
}

resource "aws_lambda_function" "sample_lambda" {
  filename         = "${path.module}/sample-lambda.zip"
  function_name    = "sample-lambda"
  role             = aws_iam_role.sample_lambda_role.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_app_zip.output_base64sha256
  runtime          = "nodejs14.x"
  depends_on = [
    aws_iam_role_policy_attachment.sample_lambda_attach_iam_policy_to_iam_role
  ]
}

resource "aws_cloudwatch_log_group" "sample_lambda_function_log_group" {
  name              = "/aws/lambda/${aws_lambda_function.sample_lambda.function_name}"
  retention_in_days = 1
  lifecycle {
    prevent_destroy = false
  }
}

resource "aws_kinesis_stream" "log_stream" {
  name             = "terraform-kinesis-test"
  shard_count      = 1
  retention_period = 24

  shard_level_metrics = [
    "IncomingBytes",
    "OutgoingBytes",
  ]

}

resource "aws_iam_role" "cloudwatch_ingestion_role" {
  name = "cloudwatch_ingestion_role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": [
          "logs.amazonaws.com"
        ]
      },
      "Effect": "Allow",
      "Sid": "",
      "Condition": { 
        "StringLike": { "aws:SourceArn": "arn:aws:logs:*:*:*" } 
      }
    }
  ]
}
EOF
}

resource "aws_iam_policy" "cloudwatch_ingestion_policy" {
  name        = "cloudwatch_ingestion_policy"
  path        = "/"
  description = "AWS IAM Policy for cloudwatch logs ingestion"
  policy      = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "kinesis:*"
      ],
      "Resource": "arn:aws:kinesis:*:*:stream/*",
      "Effect": "Allow"
    }
  ]
}
EOF
}


resource "aws_iam_role_policy_attachment" "cloudwatch_ingestion_attach_iam_policy_to_iam_role" {
  role       = aws_iam_role.cloudwatch_ingestion_role.name
  policy_arn = aws_iam_policy.cloudwatch_ingestion_policy.arn
}

resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter" {
  name     = "sample_lambda_function_logfilter"
  role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  //filter_pattern = "logtype test"
  filter_pattern  = "" //WILL THIS WORK?
  destination_arn = aws_kinesis_stream.log_stream.arn
  distribution    = "ByLogStream"
}
terraform amazon-cloudwatch amazon-kinesis
1个回答
0
投票

我注意到评论部分中围绕

filter_pattern
值的讨论,因此我在订阅过滤器模式值中进行了带空格和不带空格的实验,并观察到两种过滤器模式的行为相同。

这是我使用的 Terraform 脚本:

resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
  name     = "sample_lambda_function_logfilter_with_space"
  role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  filter_pattern  = " "
  destination_arn = aws_kinesis_stream.log_stream.arn
  distribution    = "ByLogStream"
}

resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
  name     = "sample_lambda_function_logfilter_without_space"
  role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  filter_pattern  = ""
  destination_arn = aws_kinesis_stream.log_stream.arn
  distribution    = "ByLogStream"
}

Terraform 计划输出:

  # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_with_space will be created
  + resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
      + destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
      + distribution    = "ByLogStream"
      + filter_pattern  = " "
      + id              = (known after apply)
      + log_group_name  = "/aws/lambda/sample-lambda"
      + name            = "sample_lambda_function_logfilter_with_space"
      + role_arn        = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
    }

  # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_without_space will be created
  + resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
      + destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
      + distribution    = "ByLogStream"
      + id              = (known after apply)
      + log_group_name  = "/aws/lambda/sample-lambda"
      + name            = "sample_lambda_function_logfilter_without_space"
      + role_arn        = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
    }

注意,如果我们传递一个没有空间值的过滤器模式,Terraform 在规划阶段不会分配过滤器模式值。但是,Kinesis 将此订阅过滤器视为与其他过滤器相同。

在 AWS 控制台中,如图所示,两个订阅过滤器呈现相同的模式值:

因此,我们可以排除与上述 Terraform 脚本中的订阅过滤器模式值有关的任何混淆。


那么,现在可能出现什么问题呢?我的主要怀疑是您过滤 Kinesis 数据流上的记录的方式是错误的。我怀疑您在数据查看器中获取记录时可能使用了

Latest
ShardIteratorType(起始位置下拉列表)。

Latest
ShardIteratorType 显示分片中最新记录之后的记录。考虑到您的操作顺序,您可能首先执行了 lambda 函数,然后尝试在数据查看器中过滤记录。由于这些操作之间存在时间滞后,当您尝试使用最新起始位置过滤记录时,Kinesis 会在最近发布的 CloudWatch 日志后生成一个数据指针,这就是您在 Kinesis 上看不到任何记录的原因.


AWS CLI 命令

要使用

Latest
ShardIteratorType 获取记录,请按照以下步骤操作:

首先,执行以下命令。这将在分片中存在的最新记录之后创建一个数据指针:

aws kinesis get-shard-iterator \
    --stream-name terraform-kinesis-test \
    --shard-id shardId-000000000000 \
    --shard-iterator-type LATEST

命令输出:

{
    "ShardIterator": "AAAAAAAAAAGiKQ..."
}

现在,您可以执行 lambda 函数来生成一些 CloudWatch 日志。然后,日志将通过新创建的订阅过滤器发送到 Kinesis。

接下来,执行以下命令,使用您之前检索到的分片迭代器值从 Kinesis 数据流中获取记录。

aws kinesis get-records \
    --limit 10 \
    --shard-iterator "AAAAAAAAAAGiKQ..."

命令输出:

{
    "Records": [
        {
            "SequenceNumber": "49643477757265957414492357197584820922864438932158808066",
            "ApproximateArrivalTimestamp": "2023-08-10T23:43:56.704000+00:00",
            "Data": "H4sIAAAAAAAA/...",
            "PartitionKey": "f656f4eedc671f9bd3cea60ef85e599c"
        },
   ],
    "NextShardIterator": "AAAAAAAAAAFOa...",
    "MillisBehindLatest": 0
}

您在记录部分下看到的

Data
字段是base64编码和GZIP压缩的,它有一个CloudWatch日志,因此请使用以下命令检索实际值。

echo -n "<BASE64ENCODED_GZIP_COMPRESSED_DATA>" | base64 -d | zcat

上述步骤将帮助您通过 AWS CLI 使用

Latest
ShardIteratorType 类型检索记录。不过,如果您打算直接查看 AWS 控制台的数据查看器部分中的记录,则可以使用替代方案 ShardIteratorTypes

例如,当使用

TRIM_HORIZON
起始位置时,数据将如下图所示:

要了解有关 ShardIteratorTypes 的更多信息,请参阅 this 链接。

© www.soinside.com 2019 - 2024. All rights reserved.