停止Lambda触发器

问题描述 投票:0回答:1

我在dynamoDB中有一些记录,并通过serverless.yml文件配置与之关联的触发器。

下面是配置events: - stream: type: dynamodb arn: Fn::GetAtt: - myTable - StreamArn batchSize: 1

但是我要求基于某个标志,如果一个记录以失败结束,那么我应该停止执行所有记录(lambda),然后必须从dynamoDB中删除该事务的所有记录。

我注意到即使从DynamodB删除记录后,触发器仍然继续,请问有没有办法获得有关上下文的现有触发器并停止所有?

P.S我正在使用Nodejs

代码和步骤

Checker.js - 与外部系统交谈并将记录添加到指定的dynamoDB表并退出。以下函数验证数据是通过来自dynamodB的事件调用的。它的无服务器配置如下

  ValidateData :
handler: ValidateData.handler
memorySize: 1536
timeout: 300
events:
  - stream:
      type: dynamodb
      arn:
        Fn::GetAtt:
          - kpiTaskTable
          - StreamArn
      batchSize: 1

验证数据 -

 async.waterfall([
        //go get kpis
        function getKPIs(next) {
            request({
                agent: agent,
                uri: getKPIsURL(endpoint, APIKey),
                maxAttempts: retryCount,
                retryDelay: retryDelayTime,
                retryStrategy: request.RetryStrategies.HTTPOrNetworkError
            }, function (error, response, body) {
                if (error) {
                    console.log("ERROR:Error whilst fetching KPI's: " + error);
                    next(error);
                } else {
                    //need to add in here to check that at least one kpi was returned otherwise error
                    kpis = JSON.parse(body);
                    if (kpis.constructor != Array) {
                        console.log("ERROR:Error KPI's are not of type Array");
                        next(error);
                    } else {
                        next(null);
                    }
                }
            });
        }................................

    function (err) {
        if (err) {
            console.log("ERROR: Something has gone wrong: " + err)
            var stopReportGen = process.env.STOP_REPORT_CREATION_ON_ERROR;
            if (stopReportGen === "true") {
                console.log("Deleting records from dynamoDB for report ID " + reportId);

                kpiUtil.deleteRecordsFromDynamoDB(reportId).then(function () {
                    s3Api.deleteFile(reportBucket, reportName, retryCount, retryDelayTime).then(function () {
                        console.log("INFO : The temp file is deleted from the S3 bucket")
                        callback(null, "ERROR: " + sourceId + "Report ID :" + reportId);
                    }).catch(function (err) {
                        console.log("ERROR : Error in deleting the temp file from the S3 bucket")
                        callback(null, "ERROR:  " + sourceId + "Report ID :" + reportId);
                    })
                });

            }

从Dynamodb删除 - 从DB中删除记录

var AWS = require('aws-sdk');
var fs = require('fs');
var path = require('path');
var zlib = require('zlib');
var fs = require('fs');

(function (exports) {
deleteRecordsFromDynamoDB = function (reportId) {
    return new Promise(function (resolve, reject) {
        var docClient = new AWS.DynamoDB.DocumentClient();
        var table = process.env.KPI_TASK_TABLE;

        var params = {
            TableName: table,
            FilterExpression: "#reportId = :reportId_val",
            ExpressionAttributeNames: {
                "#reportId": "reportId",
            },
            ExpressionAttributeValues: { ":reportId_val": parseInt(reportId) }

        };

        docClient.scan(params, onScan);
        var count = 0;

        function onScan(err, data) {
            if (err) {
                console.error("ERROR: Error, Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
                reject(err);
            } else {
                console.log("Scan succeeded for reportID ::"+reportId);
                data.Items.forEach(function (itemdata) {
                    var delParams = {
                        TableName: table,
                        Key: {
                            "reportSource": itemdata.reportSource
                        }
                    };

                    console.log("Attempting a conditional delete...");
                    docClient.delete(delParams, function (err, data) {
                        if (err) {
                            console.error("ERROR:Error, Unable to delete item. Error JSON:", JSON.stringify(err, null, 2));
                            reject(err);
                        } else {
                            console.log("DeleteItem succeeded:", JSON.stringify(data, null, 2));
                        }
                    });
                    console.log("INFO:Item :", ++count, JSON.stringify(itemdata));



                });

                // continue scanning if we have more items
                if (typeof data.LastEvaluatedKey != "undefined") {
                    console.log("Scanning for more...");
                    params.ExclusiveStartKey = data.LastEvaluatedKey;
                    docClient.scan(params, onScan);
                }else{
                    resolve("sucess");
                }
            }
        }
    });
}
exports.deleteRecordsFromDynamoDB = deleteRecordsFromDynamoDB;
}(typeof exports === 'undefined' ? this['deleteRecordsFromDynamoDB'] = {} :    exports))
node.js amazon-web-services aws-lambda amazon-dynamodb
1个回答
0
投票

基于以上描述,我的理解是删除项目也将创建到lambda的流。您可以通过两种方式忽略删除流: -

1)检查记录中的eventName。如果eventNameREMOVE,您可以忽略Lambda函数中的流

2)在删除Dynamodb中的项目之前,请使用Update Table API禁用DynamoDB表上的流。

请注意,Update Table是异步操作。因此需要一段时间来反映这一变化。在禁用流之前,不应删除这些项目。否则,您可以将选项1和2都实现为更安全的一面。

var params = {
  TableName: "SomeTableName",
  StreamSpecification: {
    StreamEnabled: false    
  }
 };
 dynamodb.updateTable(params, function(err, data) {
   if (err) console.log(err, err.stack); // an error occurred
   else     console.log(data);  
 }  

当您希望Lambda触发器恢复运行时,可能需要启用流。

© www.soinside.com 2019 - 2024. All rights reserved.