我在dynamoDB中有一些记录,并通过serverless.yml文件配置与之关联的触发器。
下面是配置events:
- stream:
type: dynamodb
arn:
Fn::GetAtt:
- myTable
- StreamArn
batchSize: 1
但是我要求基于某个标志,如果一个记录以失败结束,那么我应该停止执行所有记录(lambda),然后必须从dynamoDB中删除该事务的所有记录。
我注意到即使从DynamodB删除记录后,触发器仍然继续,请问有没有办法获得有关上下文的现有触发器并停止所有?
P.S我正在使用Nodejs
代码和步骤
Checker.js - 与外部系统交谈并将记录添加到指定的dynamoDB表并退出。以下函数验证数据是通过来自dynamodB的事件调用的。它的无服务器配置如下
ValidateData :
handler: ValidateData.handler
memorySize: 1536
timeout: 300
events:
- stream:
type: dynamodb
arn:
Fn::GetAtt:
- kpiTaskTable
- StreamArn
batchSize: 1
验证数据 -
async.waterfall([
//go get kpis
function getKPIs(next) {
request({
agent: agent,
uri: getKPIsURL(endpoint, APIKey),
maxAttempts: retryCount,
retryDelay: retryDelayTime,
retryStrategy: request.RetryStrategies.HTTPOrNetworkError
}, function (error, response, body) {
if (error) {
console.log("ERROR:Error whilst fetching KPI's: " + error);
next(error);
} else {
//need to add in here to check that at least one kpi was returned otherwise error
kpis = JSON.parse(body);
if (kpis.constructor != Array) {
console.log("ERROR:Error KPI's are not of type Array");
next(error);
} else {
next(null);
}
}
});
}................................
function (err) {
if (err) {
console.log("ERROR: Something has gone wrong: " + err)
var stopReportGen = process.env.STOP_REPORT_CREATION_ON_ERROR;
if (stopReportGen === "true") {
console.log("Deleting records from dynamoDB for report ID " + reportId);
kpiUtil.deleteRecordsFromDynamoDB(reportId).then(function () {
s3Api.deleteFile(reportBucket, reportName, retryCount, retryDelayTime).then(function () {
console.log("INFO : The temp file is deleted from the S3 bucket")
callback(null, "ERROR: " + sourceId + "Report ID :" + reportId);
}).catch(function (err) {
console.log("ERROR : Error in deleting the temp file from the S3 bucket")
callback(null, "ERROR: " + sourceId + "Report ID :" + reportId);
})
});
}
从Dynamodb删除 - 从DB中删除记录
var AWS = require('aws-sdk');
var fs = require('fs');
var path = require('path');
var zlib = require('zlib');
var fs = require('fs');
(function (exports) {
deleteRecordsFromDynamoDB = function (reportId) {
return new Promise(function (resolve, reject) {
var docClient = new AWS.DynamoDB.DocumentClient();
var table = process.env.KPI_TASK_TABLE;
var params = {
TableName: table,
FilterExpression: "#reportId = :reportId_val",
ExpressionAttributeNames: {
"#reportId": "reportId",
},
ExpressionAttributeValues: { ":reportId_val": parseInt(reportId) }
};
docClient.scan(params, onScan);
var count = 0;
function onScan(err, data) {
if (err) {
console.error("ERROR: Error, Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
reject(err);
} else {
console.log("Scan succeeded for reportID ::"+reportId);
data.Items.forEach(function (itemdata) {
var delParams = {
TableName: table,
Key: {
"reportSource": itemdata.reportSource
}
};
console.log("Attempting a conditional delete...");
docClient.delete(delParams, function (err, data) {
if (err) {
console.error("ERROR:Error, Unable to delete item. Error JSON:", JSON.stringify(err, null, 2));
reject(err);
} else {
console.log("DeleteItem succeeded:", JSON.stringify(data, null, 2));
}
});
console.log("INFO:Item :", ++count, JSON.stringify(itemdata));
});
// continue scanning if we have more items
if (typeof data.LastEvaluatedKey != "undefined") {
console.log("Scanning for more...");
params.ExclusiveStartKey = data.LastEvaluatedKey;
docClient.scan(params, onScan);
}else{
resolve("sucess");
}
}
}
});
}
exports.deleteRecordsFromDynamoDB = deleteRecordsFromDynamoDB;
}(typeof exports === 'undefined' ? this['deleteRecordsFromDynamoDB'] = {} : exports))
基于以上描述,我的理解是删除项目也将创建到lambda的流。您可以通过两种方式忽略删除流: -
1)检查记录中的eventName
。如果eventName
是REMOVE
,您可以忽略Lambda函数中的流
2)在删除Dynamodb中的项目之前,请使用Update Table API
禁用DynamoDB表上的流。
请注意,Update Table是异步操作。因此需要一段时间来反映这一变化。在禁用流之前,不应删除这些项目。否则,您可以将选项1和2都实现为更安全的一面。
var params = {
TableName: "SomeTableName",
StreamSpecification: {
StreamEnabled: false
}
};
dynamodb.updateTable(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data);
}
当您希望Lambda触发器恢复运行时,可能需要启用流。