Kinesis Firehose 将 JSON 对象放入 S3 中,不使用分隔符逗号

问题描述 投票:0回答:5

在发送数据之前,我对数据使用 JSON.stringify,它看起来像这样

{"data": [{"key1": value1, "key2": value2}, {"key1": value1, "key2": value2}]}

但是一旦它通过 AWS API Gateway 并且 Kinesis Firehose 将其放入 S3,它看起来像这样

    {
     "key1": value1, 
     "key2": value2
    }{
     "key1": value1, 
     "key2": value2
    }

JSON 对象之间的分隔符逗号消失了,但我需要它来正确处理数据。

API网关中的模板:

#set($root = $input.path('$'))
{
    "DeliveryStreamName": "some-delivery-stream",
    "Records": [
#foreach($r in $root.data)
#set($data = "{
    ""key1"": ""$r.value1"",
    ""key2"": ""$r.value2""
}")
    {
        "Data": "$util.base64Encode($data)"
    }#if($foreach.hasNext),#end
#end
    ]
}
json amazon-web-services aws-api-gateway amazon-kinesis amazon-kinesis-firehose
5个回答
11
投票

我最近遇到了同样的问题,我能找到的唯一答案基本上只是添加换行符(“ ") 到每个 JSON 消息的末尾,每当您将它们发布到 Kinesis 流时,或者使用某种可以处理没有分隔符的串联 JSON 对象的原始 JSON 解码器方法。

我发布了一个Python代码解决方案,可以在相关的Stack Overflow帖子上找到: https://stackoverflow.com/a/49417680/1546785


2
投票

您可以考虑的一种方法是通过添加 Lambda 函数作为其数据处理器来为 Kinesis Firehose 传输流配置数据处理,该函数将在最终将数据传输到 S3 存储桶之前执行。

DeliveryStream:
  ...
  Type: AWS::KinesisFirehose::DeliveryStream
  Properties:
    DeliveryStreamType: DirectPut
    ExtendedS3DestinationConfiguration:
      ...
      BucketARN: !GetAtt MyDeliveryBucket.Arn
      ProcessingConfiguration:
        Enabled: true
        Processors:
          - Parameters:
              - ParameterName: LambdaArn
                ParameterValue: !GetAtt MyTransformDataLambdaFunction.Arn
            Type: Lambda
    ...

并且在 Lambda 函数中,确保将

'\n'
附加到记录的 JSON 字符串,请参阅下面 Node.js 中的 Lambda 函数
myTransformData.ts

import {
  FirehoseTransformationEvent,
  FirehoseTransformationEventRecord,
  FirehoseTransformationHandler,
  FirehoseTransformationResult,
  FirehoseTransformationResultRecord,
} from 'aws-lambda';

const createDroppedRecord = (
  recordId: string
): FirehoseTransformationResultRecord => {
  return {
    recordId,
    result: 'Dropped',
    data: Buffer.from('').toString('base64'),
  };
};

const processData = (
  payloadStr: string,
  record: FirehoseTransformationEventRecord
) => {
  let jsonRecord;
  // ...
  // Process the orginal payload,
  // And create the record in JSON
  return jsonRecord;
};

const transformRecord = (
  record: FirehoseTransformationEventRecord
): FirehoseTransformationResultRecord => {
  try {
    const payloadStr = Buffer.from(record.data, 'base64').toString();
    const jsonRecord = processData(payloadStr, record);
    if (!jsonRecord) {
      console.error('Error creating json record');
      return createDroppedRecord(record.recordId);
    }
    return {
      recordId: record.recordId,
      result: 'Ok',
      // Ensure that '\n' is appended to the record's JSON string.
      data: Buffer.from(JSON.stringify(jsonRecord) + '\n').toString('base64'),
    };
  } catch (error) {
    console.error('Error processing record ${record.recordId}: ', error);
    return createDroppedRecord(record.recordId);
  }
};

const transformRecords = (
  event: FirehoseTransformationEvent
): FirehoseTransformationResult => {
  let records: FirehoseTransformationResultRecord[] = [];
  for (const record of event.records) {
    const transformed = transformRecord(record);
    records.push(transformed);
  }
  return { records };
};

export const handler: FirehoseTransformationHandler = async (
  event,
  _context
) => {
  const transformed = transformRecords(event);
  return transformed;
};

一旦换行符就位,Athena 等 AWS 服务将能够正确处理 S3 存储桶中的 JSON 记录数据,而不是仅看到第一个 JSON 记录


0
投票

AWS Firehose 将 JSON 对象转储到 s3 后,就完全可以从文件中读取各个 JSON 对象。

使用 Python,您可以使用

raw_decode
包中的
json
函数

from json import JSONDecoder, JSONDecodeError
import re
import json
import boto3

NOT_WHITESPACE = re.compile(r'[^\s]')

def decode_stacked(document, pos=0, decoder=JSONDecoder()):
    while True:
        match = NOT_WHITESPACE.search(document, pos)
        if not match:
            return
        pos = match.start()

        try:
            obj, pos = decoder.raw_decode(document, pos)
        except JSONDecodeError:
            # do something sensible if there's some error
            raise
        yield obj

s3 = boto3.resource('s3')

obj = s3.Object("my-bukcet", "my-firehose-json-key.json")
file_content = obj.get()['Body'].read()
for obj in decode_stacked(file_content):
    print(json.dumps(obj))
    #  { "key1":value1,"key2":value2}
    #  { "key1":value1,"key2":value2}

来源:https://stackoverflow.com/a/50384432/1771155

使用Glue / Pyspark你可以使用

import json

rdd = sc.textFile("s3a://my-bucket/my-firehose-file-containing-json-objects")
df = rdd.map(lambda x: json.loads(x)).toDF()
df.show()

来源:https://stackoverflow.com/a/62984450/1771155


0
投票

这基本上是故意的。默认情况下,redshift copy 和 Athena serde 无法处理 json 文件中的外部数组。对于 Athena/spectrum,您必须指定 ': 'strip.outer.array'='true'',对于 redshift 复制,除了使用 'noshred' 将整个文件作为单行读取之外,没有其他办法,并且当总的 json 超过 16mb。如果你只是在其中插入逗号而没有外部数组括号,那么无论如何它都是无效的 json


-1
投票

请使用此代码来解决您的问题


__Author__ = "Soumil Nitin Shah"
import json
import boto3
import base64


class MyHasher(object):
    def __init__(self, key):
        self.key = key

    def get(self):
        keys = str(self.key).encode("UTF-8")
        keys = base64.b64encode(keys)
        keys = keys.decode("UTF-8")
        return keys

def lambda_handler(event, context):

    output = []
    for record in event['records']:

        payload = base64.b64decode(record['data'])

        """Get the payload from event bridge and just get data attr """""
        serialize_payload = str(json.loads(payload)) + "\n"
        hasherHelper = MyHasher(key=serialize_payload)
        hash = hasherHelper.get()

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': hash
        }
        print("output_record", output_record)

        output.append(output_record)

    return {'records': output}







© www.soinside.com 2019 - 2024. All rights reserved.