我正在尝试使用订阅过滤器将 Cloudwatch 日志转发到 Kinesis Data Stream。
即使拥有广泛的权限,由于某种原因,我也无法在 Kinesis 中看到任何记录。
我可以在 Cloudwatch 日志组中看到日志,但不会转发到 Kinesis Data Stream。 lambda 是一个基本的,只有几个 console.log 语句。
这是堆栈的 terraform 代码。
resource "aws_iam_role" "sample_lambda_role" {
name = "sample_lambda_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_iam_policy" "sample_lambda_policy" {
name = "sample_lambda_policy"
path = "/"
description = "AWS IAM Policy for managing aws lambda role"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*",
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "sample_lambda_attach_iam_policy_to_iam_role" {
role = aws_iam_role.sample_lambda_role.name
policy_arn = aws_iam_policy.sample_lambda_policy.arn
}
data "archive_file" "lambda_app_zip" {
type = "zip"
source_dir = "${path.module}/sample-lambda"
#source_file = "index.js" #if one file
output_path = "${path.module}/sample-lambda.zip"
}
resource "aws_lambda_function" "sample_lambda" {
filename = "${path.module}/sample-lambda.zip"
function_name = "sample-lambda"
role = aws_iam_role.sample_lambda_role.arn
handler = "index.handler"
source_code_hash = data.archive_file.lambda_app_zip.output_base64sha256
runtime = "nodejs14.x"
depends_on = [
aws_iam_role_policy_attachment.sample_lambda_attach_iam_policy_to_iam_role
]
}
resource "aws_cloudwatch_log_group" "sample_lambda_function_log_group" {
name = "/aws/lambda/${aws_lambda_function.sample_lambda.function_name}"
retention_in_days = 1
lifecycle {
prevent_destroy = false
}
}
resource "aws_kinesis_stream" "log_stream" {
name = "terraform-kinesis-test"
shard_count = 1
retention_period = 24
shard_level_metrics = [
"IncomingBytes",
"OutgoingBytes",
]
}
resource "aws_iam_role" "cloudwatch_ingestion_role" {
name = "cloudwatch_ingestion_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": [
"logs.amazonaws.com"
]
},
"Effect": "Allow",
"Sid": "",
"Condition": {
"StringLike": { "aws:SourceArn": "arn:aws:logs:*:*:*" }
}
}
]
}
EOF
}
resource "aws_iam_policy" "cloudwatch_ingestion_policy" {
name = "cloudwatch_ingestion_policy"
path = "/"
description = "AWS IAM Policy for cloudwatch logs ingestion"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"kinesis:*"
],
"Resource": "arn:aws:kinesis:*:*:stream/*",
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "cloudwatch_ingestion_attach_iam_policy_to_iam_role" {
role = aws_iam_role.cloudwatch_ingestion_role.name
policy_arn = aws_iam_policy.cloudwatch_ingestion_policy.arn
}
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter" {
name = "sample_lambda_function_logfilter"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
//filter_pattern = "logtype test"
filter_pattern = "" //WILL THIS WORK?
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
我注意到评论部分中围绕
filter_pattern
值的讨论,因此我在订阅过滤器模式值中进行了带空格和不带空格的实验,并观察到两种过滤器模式的行为相同。
这是我使用的 Terraform 脚本:
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
name = "sample_lambda_function_logfilter_with_space"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
filter_pattern = " "
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
name = "sample_lambda_function_logfilter_without_space"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
filter_pattern = ""
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
Terraform 计划输出:
# aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_with_space will be created
+ resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
+ destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
+ distribution = "ByLogStream"
+ filter_pattern = " "
+ id = (known after apply)
+ log_group_name = "/aws/lambda/sample-lambda"
+ name = "sample_lambda_function_logfilter_with_space"
+ role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
}
# aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_without_space will be created
+ resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
+ destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
+ distribution = "ByLogStream"
+ id = (known after apply)
+ log_group_name = "/aws/lambda/sample-lambda"
+ name = "sample_lambda_function_logfilter_without_space"
+ role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
}
注意,如果我们传递一个没有空间值的过滤器模式,Terraform 在规划阶段不会分配过滤器模式值。但是,Kinesis 将此订阅过滤器视为与其他过滤器相同。
在 AWS 控制台中,如图所示,两个订阅过滤器呈现相同的模式值:
因此,我们可以排除与上述 Terraform 脚本中的订阅过滤器模式值有关的任何混淆。
那么,现在可能出现什么问题呢?我的主要怀疑是您过滤 Kinesis 数据流上的记录的方式是错误的。我怀疑您在数据查看器中获取记录时可能使用了
Latest
ShardIteratorType(起始位置下拉列表)。
Latest
ShardIteratorType 显示分片中最新记录之后的记录。考虑到您的操作顺序,您可能首先执行了 lambda 函数,然后尝试在数据查看器中过滤记录。由于这些操作之间存在时间滞后,当您尝试使用最新起始位置过滤记录时,Kinesis 会在最近发布的 CloudWatch 日志后生成一个数据指针,这就是您在 Kinesis 上看不到任何记录的原因.
AWS CLI 命令
要使用
Latest
ShardIteratorType 获取记录,请按照以下步骤操作:
首先,执行以下命令。这将在分片中存在的最新记录之后创建一个数据指针:
aws kinesis get-shard-iterator \
--stream-name terraform-kinesis-test \
--shard-id shardId-000000000000 \
--shard-iterator-type LATEST
命令输出:
{
"ShardIterator": "AAAAAAAAAAGiKQ..."
}
现在,您可以执行 lambda 函数来生成一些 CloudWatch 日志。然后,日志将通过新创建的订阅过滤器发送到 Kinesis。
接下来,执行以下命令,使用您之前检索到的分片迭代器值从 Kinesis 数据流中获取记录。
aws kinesis get-records \
--limit 10 \
--shard-iterator "AAAAAAAAAAGiKQ..."
命令输出:
{
"Records": [
{
"SequenceNumber": "49643477757265957414492357197584820922864438932158808066",
"ApproximateArrivalTimestamp": "2023-08-10T23:43:56.704000+00:00",
"Data": "H4sIAAAAAAAA/...",
"PartitionKey": "f656f4eedc671f9bd3cea60ef85e599c"
},
],
"NextShardIterator": "AAAAAAAAAAFOa...",
"MillisBehindLatest": 0
}
您在记录部分下看到的
Data
字段是base64编码和GZIP压缩的,它有一个CloudWatch日志,因此请使用以下命令检索实际值。
echo -n "<BASE64ENCODED_GZIP_COMPRESSED_DATA>" | base64 -d | zcat
上述步骤将帮助您通过 AWS CLI 使用
Latest
ShardIteratorType 类型检索记录。不过,如果您打算直接查看 AWS 控制台的数据查看器部分中的记录,则可以使用替代方案 ShardIteratorTypes。
例如,当使用
TRIM_HORIZON
起始位置时,数据将如下图所示:
要了解有关 ShardIteratorTypes 的更多信息,请参阅 this 链接。