我正在尝试从 Kinesis Firehose 发送一个非云监控事件到 Splunk。我正在用 Lambda 处理事件,并以下面的格式将其反馈到 Firehose 中 (消防管所需):
{
"records": [
{
"recordId": "2345678",
"result": "Ok",
"data": [base64-encoded custom JSON]
}
]
}
但是,一旦到了Splunk,它就会抛出一个模糊的解析错误,还有一个无处可去的帮助链接。
"errorCode":"Splunk.InvalidDataFormat","errorMessage":"The data is not formatted correctly. To see how to properly format data for Raw or Event HEC endpoints, see Splunk Event Data (http://dev.splunk.com/view/event-collector/SP-CAAAE6P#data)"
我到底漏了什么?HEC端点无法以标准格式解析来自Firehose的消息,这似乎很奇怪。
我正在将消息发送到一个HEC事件端点,使用splunk_configuration块中的以下内容 aws_kinesis_firehose_delivery_stream Terraform模块。.
想到了! 为了方便后人,因为这不是一个很好的文档。
你的... data
字段需要是一个base64编码的对象,它遵循了 Splunk事件收集器规格.
只要 两者 Firehose和Splunk可以读取Lambda返回的有效载荷,它不应该抛出一个错误。
下面是Kinesis Firehose变换器Lambda的代码(node12运行时)。
/*
* Transformer for sending Kinesis Firehose events to Splunk
*
* Properly formats incoming messages for Splunk ingestion
* Returned object gets fed back into Kinesis Firehose and sent to Splunk
*/
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
let dropped = 0; // Number of dropped entries
/* Process the list of records and transform them to adhere to Splunk specs */
const output = event.records.map((record) => {
try {
const entry = (Buffer.from(record.data, 'base64')).toString('utf8');
/*
* IMPORTANT: `data` object should follow Splunk event formatting specs prior to encoding.
* Otherwise, it will throw a parsing error.
* https://docs.splunk.com/Documentation/Splunk/8.0.3/Data/FormateventsforHTTPEventCollector
*/
const obj = {
sourcetype: "aws:firehose:json", // Required, will error
event: JSON.parse(entry)
}
const payload = (Buffer.from(JSON.stringify(obj), 'utf8')).toString('base64');
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
} catch (e) {
failure++
console.error(e.message());
return {
recordId: record.recordId,
result: 'ProcessingFailed'
};
}
});
console.log(`Processing completed. Successful records ${success}. Failed records ${failure}.`);
callback(null, {records: output});
}