使用 Kinesis Firehose for JSON 重新格式化发送到 S3 的 CloudWatch 日志

问题描述 投票:0回答:1

我有一个工作设置,可以使用 Kinesis Firehose 将 CloudWatch 日志发送到 S3 存储桶。不幸的是,S3 中的文件不包含格式正确的 JSON。格式正确的 JSON 对象数组将如下所示:[{}, {}, {}]。但是,Firehose 创建的 S3 文件如下所示:{}{}{}。我尝试通过在开头和结尾添加方括号以及在 JSON 对象之间添加逗号来修改 Kinesis Firehose 数据转换 Lambda 蓝图。我尝试修改的 Lambda 蓝图称为“处理发送到 Kinesis Firehose 的 CloudWatch 日志”。以下是相关部分:

import base64
import json
import gzip
import boto3


def transformLogEvent(log_event):
    """Transform each log event.

    The default implementation below just extracts the message and appends a newline to it.

    Args:
    log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}

    Returns:
    str: The transformed log event.
    """
    return log_event['message'] + ',\n'


def processRecords(records):
    for r in records:
        data = loadJsonGzipBase64(r['data'])
        recId = r['recordId']
        # CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
        # They do not contain actual data.
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': recId
            }
        elif data['messageType'] == 'DATA_MESSAGE':
            joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
            dataBytes = joinedData.encode("utf-8")
            encodedData = base64.b64encode(dataBytes).decode('utf-8')
            yield {
                'data': encodedData,
                'result': 'Ok',
                'recordId': recId
            }
        else:
            yield {
                'result': 'ProcessingFailed',
                'recordId': recId
            }

def loadJsonGzipBase64(base64Data):
    return json.loads(gzip.decompress(base64.b64decode(base64Data)))


def lambda_handler(event, context):
    isSas = 'sourceKinesisStreamArn' in event
    streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
    region = streamARN.split(':')[3]
    streamName = streamARN.split('/')[1]
    records = list(processRecords(event['records']))
    projectedSize = 0
    recordListsToReingest = []

    for idx, rec in enumerate(records):
        originalRecord = event['records'][idx]

        if rec['result'] != 'Ok':
            continue

        # If a single record is too large after processing, split the original CWL data into two, each containing half
        # the log events, and re-ingest both of them (note that it is the original data that is re-ingested, not the 
        # processed data). If it's not possible to split because there is only one log event, then mark the record as
        # ProcessingFailed, which sends it to error output.
        if len(rec['data']) > 6000000:
            cwlRecord = loadJsonGzipBase64(originalRecord['data'])
            if len(cwlRecord['logEvents']) > 1:
                rec['result'] = 'Dropped'
                recordListsToReingest.append(
                    [createReingestionRecord(isSas, originalRecord, data) for data in splitCWLRecord(cwlRecord)])
            else:
                rec['result'] = 'ProcessingFailed'
                print(('Record %s contains only one log event but is still too large after processing (%d bytes), ' +
                    'marking it as %s') % (rec['recordId'], len(rec['data']), rec['result']))
            del rec['data']
        else:
            projectedSize += len(rec['data']) + len(rec['recordId'])
            # 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
            if projectedSize > 6000000:
                recordListsToReingest.append([createReingestionRecord(isSas, originalRecord)])
                del rec['data']
                rec['result'] = 'Dropped'

    # call putRecordBatch/putRecords for each group of up to 500 records to be re-ingested
    if recordListsToReingest:
        recordsReingestedSoFar = 0
        client = boto3.client('kinesis' if isSas else 'firehose', region_name=region)
        maxBatchSize = 500
        flattenedList = [r for sublist in recordListsToReingest for r in sublist]
        for i in range(0, len(flattenedList), maxBatchSize):
            recordBatch = flattenedList[i:i + maxBatchSize]
            # last argument is maxAttempts
            args = [streamName, recordBatch, client, 0, 20]
            if isSas:
                putRecordsToKinesisStream(*args)
            else:
                putRecordsToFirehoseStream(*args)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d/%d' % (recordsReingestedSoFar, len(flattenedList)))

    print('%d input records, %d returned as Ok or ProcessingFailed, %d split and re-ingested, %d re-ingested as-is' % (
        len(event['records']),
        len([r for r in records if r['result'] != 'Dropped']),
        len([l for l in recordListsToReingest if len(l) > 1]),
        len([l for l in recordListsToReingest if len(l) == 1])))
    
    # encapsulate in square brackets for proper JSON formatting
    last = len(event['records'])-1
    start = '['+base64.b64decode(records[0]['data']).decode('utf-8')
    end = base64.b64decode(records[last]['data']).decode('utf-8')+']'
    
    records[0]['data'] = base64.b64encode(start.encode('utf-8'))
    records[last]['data'] = base64.b64encode(end.encode('utf-8'))

    return {'records': records}

}

我在之前添加了逗号 在transformLogEvent方法和lambda_handler方法末尾的这些行中添加方括号:

last = len(event['records'])-1
start = '['+base64.b64decode(records[0]['data']).decode('utf-8')
end = base64.b64decode(records[last]['data']).decode('utf-8')+']'
    
records[0]['data'] = base64.b64encode(start.encode('utf-8'))
records[last]['data'] = base64.b64encode(end.encode('utf-8'))

我在测试此 Lambda 函数时收到此错误:

{
  "errorMessage": "'data'",
  "errorType": "KeyError",
  "requestId": "04ac151c-c429-484d-813f-ffd5d65286e2",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 270, in lambda_handler\n    start = '['+base64.b64decode(records[0]['data']).decode('utf-8')\n"
  ]
}

我认为这是我引用 Python 列表的方式错误。我想知道如何修复此错误和/或是否有人对如何解决 JSON 格式问题有更好的解决方案,请记住 CloudWatch Logs 事件以压缩 gzip 格式发送到 Kinesis Data Firehose。

python amazon-web-services amazon-s3 aws-lambda amazon-cloudwatch
1个回答
0
投票

它失败是因为您的“processReocrds()”函数可以返回没有“data”键的“记录”。具体来说,如果输入记录是 CONTROL_MESSAGE,或者它既不是 CONTORL_MESSAGE 也不是 DATA_MESSAGE 记录,则该函数“生成”一个没有“data”键的“字典”。如果第一条或最后一条记录(本例中为第一条记录)就是这种情况,它将导致 python 脚本抛出您收到的异常。

© www.soinside.com 2019 - 2024. All rights reserved.