Kinesis DLQ 不包含有效负载,仅包含元数据。您应该通过调用 GetRecords
来读取有效负载
就像在这个 SO 问题中。
我的设置如下:
Apps --> Kinesis Data Stream ---> Kinesis Firehose ---> S3 [all events]
| ---> Processing Lambda ---> <external service>
| |
| |
DLQ Lambda <-------------------- DLQ
|
---> S3 [failed events]
基本上:
不幸的是,
GetRecords
每个分片每秒只能调用 5 次。您也无法将 Kinesis Firehose 设置为使用 EFO,这又意味着 Firehose 和 DLQ Lambda 共享容量。
现在,当故障大量增加时(多个分片、较大的并行化因子和外部服务的停机时间),我觉得 DLQ 可能会因失败的消息元数据而溢出。然后,不仅每个分片的
GetRecords
可能会受到限制,它也可能会影响 Firehose(而且我绝对需要 Firehose 始终工作!)。
另一方面,如果我调整 DLQ Lambda 设置以确保它每秒仅被调用 2-3 次(例如通过设置 SQS 并发性并人为地使 Lambda 永远不会少于 300 毫秒),我可能没有足够的时间在 DLQ 上的所有消息在 Kinesis Data Stream 上过期之前使用它们。 本质上:进入 DLQ 的消息数量将超出我的处理能力。
当然,这一切都因系统负载、二等分设置、并行化因子、批量大小、重试次数等而异,但我觉得这是一个难题。
问题:您有什么建议?是否有关于如何“大规模”处理来自 Kinesis 的失败 DLQ 消息的标准指南?我没有找到一种简单的方法来确保 DLQ Lambda 遵守 Shard
GetRecords
限制。
旁注 - 如果我添加更多消费者(目前只有两个,但 Flink 很快就会计划),并且每个消费者都有自己的 DLQ,情况会变得更糟。我暂时忽略它。
虽然
GetRecords
API 仅支持每个分片 5 TPS,但它实际上最多可以返回 10,000 条记录或最多 10MB(source)。
因此,只要您的 DLQ lambda 将失败记录批量分成 10,000 条,我认为它应该有足够的容量来读取所有这些记录,因为每个分片最多可以每秒写入 1000 条记录(或 1MB/秒),所以您不应该在较长时间内每秒超过 10,000 条记录。
对于共享容量,Kinesis 中的每个分片最大容量可支持 2 个读取器(高达 1 MB/秒或 1,000 条记录/秒的写入吞吐量或高达 2 MB/秒或 2,000 条记录/秒的读取吞吐量),因此只要因为你的数量不超过 2 个,所以应该没问题。如果您继续添加更多 DLQ,那么您可以尝试重用相同的 DLQ 读取 lambda 来从多个 DLQ 中读取,这样您就只有 1 个 DLQ 使用者。