我尝试使用 AWS MSK 作为 AWS Firehose 传输流中的源,但在尝试将数据转换到 Firehose 流并将数据转储到 S3 之前,我遇到了死胡同。
首先,根据 AWS 文档,type 参数的有效值为: RecordDeAggregation |拉姆达 |元数据提取 |向记录追加分隔符 |减压。我感兴趣的是 AppendDelimiterToRecord,但是使用正确设置的 AppendDelimiterToRecord 处理器类型部署 Firehose Stream 绝对不会产生任何结果,并且在使用 AWS 控制台部署/编辑 Firehose Stream 时,此选项也不可用。
是我配置错误还是 AWS 太奇怪了?
其次,使用 Lambda 作为处理器会导致以下错误消息,该消息到达 S3 存储桶中的 /processing-failed 前缀,"errorCode":"Lambda.InvalidReturnFormat","errorMessage":"The data field can not be null如果状态为“Ok””。数据字段肯定不为空,因为我可以从 Lamdba 日志中看到它已填充。 Lambda 处理过程中也没有错误。
再说一遍,我是否错过了什么,或者 AWS 是否很奇怪?
P.S 由于公司政策,无法共享任何代码,但我正在使用最新版本的 Terraform 来创建上述所有对象
P.S.S Lambda 是根据 AWS 蓝图创建的,用于 Lambda 与 Kinesis 集成
显然,即使 Firehose 的错误消息明确指出
errorCode":"Lambda.InvalidReturnFormat","errorMessage":"The data field cannot be null if status is Ok"
您必须在 Lambda 的响应负载中填充的实际字段是 kafkaRecordValue(采用 Base64 编码字符串)。 {
"records": [
{"recordId": "123", "result": "Ok", "kafkaRecordValue": "YsAxazaS"},
{"recordId": "123", "result": "Ok", "kafkaRecordValue": "YsAxazaS"},
...
]
}
而不是
{
"records": [
{"recordId": "123", "result": "Ok", "data": "YsAxazaS"},
{"recordId": "123", "result": "Ok", "data": "YsAxazaS"},
...
]
}
P.S 我对 AWS 文档和现有文档的清晰度感到非常失望,老实说,我没有在这里咒骂