我如何从 Kinesis Firehose 向 Splunk 发送非云监控 JSON 事件?

问题描述 投票:0回答:1

我正在尝试从 Kinesis Firehose 发送一个非云监控事件到 Splunk。我正在用 Lambda 处理事件,并以下面的格式将其反馈到 Firehose 中 (消防管所需):

{ 
    "records": [
        {
          "recordId": "2345678",
          "result": "Ok",
          "data": [base64-encoded custom JSON]
        }
    ]
}

但是,一旦到了Splunk,它就会抛出一个模糊的解析错误,还有一个无处可去的帮助链接。

"errorCode":"Splunk.InvalidDataFormat","errorMessage":"The data is not formatted correctly. To see how to properly format data for Raw or Event HEC endpoints, see Splunk Event Data (http://dev.splunk.com/view/event-collector/SP-CAAAE6P#data)"

我到底漏了什么?HEC端点无法以标准格式解析来自Firehose的消息,这似乎很奇怪。

我正在将消息发送到一个HEC事件端点,使用splunk_configuration块中的以下内容 aws_kinesis_firehose_delivery_stream Terraform模块。.

json parsing terraform splunk amazon-kinesis-firehose
1个回答
0
投票

想到了! 为了方便后人,因为这不是一个很好的文档。

你的... data 字段需要是一个base64编码的对象,它遵循了 Splunk事件收集器规格.

只要 两者 Firehose和Splunk可以读取Lambda返回的有效载荷,它不应该抛出一个错误。

下面是Kinesis Firehose变换器Lambda的代码(node12运行时)。

/*
* Transformer for sending Kinesis Firehose events to Splunk
*
* Properly formats incoming messages for Splunk ingestion
* Returned object gets fed back into Kinesis Firehose and sent to Splunk
*/

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries

    /* Process the list of records and transform them to adhere to Splunk specs */
    const output = event.records.map((record) => {
        try {
            const entry = (Buffer.from(record.data, 'base64')).toString('utf8');

            /*
             * IMPORTANT: `data` object should follow Splunk event formatting specs prior to encoding.
             * Otherwise, it will throw a parsing error.
             * https://docs.splunk.com/Documentation/Splunk/8.0.3/Data/FormateventsforHTTPEventCollector
             */
            const obj = {
                sourcetype: "aws:firehose:json", // Required, will error
                event: JSON.parse(entry)
            }
            const payload = (Buffer.from(JSON.stringify(obj), 'utf8')).toString('base64');
            success++;
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        } catch (e) {
            failure++
            console.error(e.message());
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed'
            };
        }
    });
    console.log(`Processing completed.  Successful records ${success}. Failed records ${failure}.`);
    callback(null, {records: output});
}
© www.soinside.com 2019 - 2024. All rights reserved.