我试图为一个 Kinesis 设置一个 DLQ.我使用了 SQS 并将其设置为 Kinesis 的故障目标。
Kinesis 被连接到一个总是抛出错误的 lambda,所以事件会立即进入 SQS 的 DLQ。
我可以在 SQS 中看到事件,但是缺少事件的有效载荷(我作为事件的一部分发送的 json),在 lambda 中,如果我在抛出异常之前打印事件,我可以看到 base64 编码的数据,但是在我的 DLQ 中看不到。
有没有办法把事件数据也发送到DLQ中?我希望能够正确地检查错误的原因,并在我完成修复lambda中的问题后将事件放回Kinesis。
https:/docs.aws.amazon.comlambdalatestdg/with-kinesis.html#services-kinesis-errors。
实际的记录并不包括在内,所以你必须处理这个记录,并在它们过期和丢失之前从流中检索它们。
根据上面的内容,事件有效载荷不会被发送到DLQ事件中,所以这里预计会出现 "丢失事件数据"。
因此,为了找回实际的记录,你可能想尝试像这样的方法
1) 假设我们有以下 kinesis 批量信息。
{
"KinesisBatchInfo": {
"shardId": "shardId-000000000001",
"startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
"endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
"approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
"approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
"batchSize": 500,
"streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
}
}
2)我们可以通过这样的方式来找回记录。
import AWS from 'aws-sdk';
const kinesis = new AWS.Kinesis();
const ShardId = 'shardId-000000000001';
const ShardIteratorType = 'AT_SEQUENCE_NUMBER';
const StreamName = 'my-awesome-stream';
const StartingSequenceNumber =
'49601189658422359378836298521827638475320189012309704722';
const { ShardIterator } = await kinesis
.getShardIterator({
ShardId,
ShardIteratorType,
StreamName,
StartingSequenceNumber,
})
.promise();
const records = await kinesis
.getRecords({
ShardIterator,
})
.promise();
console.log('Records', records);
注意:不要忘记确保你的进程有权限1) kinesis:GetShardIterator 2) kinesis:GetRecords
希望对大家有所帮助!