在登陆 S3 之前从 AWS MSK 到 Firehose 记录转换

问题描述 投票:0回答:1

我尝试使用 AWS MSK 作为 AWS Firehose 传输流中的源,但在尝试将数据转换到 Firehose 流并将数据转储到 S3 之前,我遇到了死胡同。
首先,根据 AWS 文档type 参数的有效值为: RecordDeAggregation |拉姆达 |元数据提取 |向记录追加分隔符 |减压。我感兴趣的是 AppendDelimiterToRecord,但是使用正确设置的 AppendDelimiterToRecord 处理器类型部署 Firehose Stream 绝对不会产生任何结果,并且在使用 AWS 控制台部署/编辑 Firehose Stream 时,此选项也不可用。
是我配置错误还是 AWS 太奇怪了?
其次,使用 Lambda 作为处理器会导致以下错误消息,该消息到达 S3 存储桶中的 /processing-failed 前缀,"errorCode":"Lambda.InvalidReturnFormat","errorMessage":"The data field can not be null如果状态为“Ok””。数据字段肯定不为空,因为我可以从 Lamdba 日志中看到它已填充。 Lambda 处理过程中也没有错误。
再说一遍,我是否错过了什么,或者 AWS 是否很奇怪?

P.S 由于公司政策,无法共享任何代码,但我正在使用最新版本的 Terraform 来创建上述所有对象
P.S.S Lambda 是根据 AWS 蓝图创建的,用于 Lambda 与 Kinesis 集成

amazon-web-services amazon-kinesis-firehose aws-msk
1个回答
0
投票

显然,即使 Firehose 的错误消息明确指出

errorCode":"Lambda.InvalidReturnFormat","errorMessage":"The data field cannot be null if status is Ok"
您必须在 Lambda 的响应负载中填充的实际字段是 kafkaRecordValue(采用 Base64 编码字符串)。
所以完整的有效负载将是

{
  "records": [
    {"recordId": "123", "result": "Ok", "kafkaRecordValue": "YsAxazaS"},
    {"recordId": "123", "result": "Ok", "kafkaRecordValue": "YsAxazaS"},
    ...
  ]
}

而不是

{
  "records": [
    {"recordId": "123", "result": "Ok", "data": "YsAxazaS"},
    {"recordId": "123", "result": "Ok", "data": "YsAxazaS"},
    ...
  ]
}

P.S 我对 AWS 文档和现有文档的清晰度感到非常失望,老实说,我没有在这里咒骂

© www.soinside.com 2019 - 2024. All rights reserved.