SQS到Lambda + SES

问题描述 投票:3回答:1

我是LambdaSQS的新手,我正在尝试创建一个函数来发送电子邮件,在SQS服务中排队,但我不明白如何调用包含send + delete queue方法的过程函数。

下面我粘贴我的代码:

'use strict';

const AWS = require('aws-sdk');

const SQS = new AWS.SQS({ apiVersion: '2012-11-05' });
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });
const ses = new AWS.SES({ accessKeyId: "xxxxxxxx", secretAccesskey: "xxxxxxx/xxxxxxxxx" });
const s3 = new AWS.S3({ apiVersion: "2006-03-01", region: "us-west-2" });


const QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/xxxxxxx/queue';
const PROCESS_MESSAGE = 'process-message';

function getPieceOfMail (path, mapObj, replace) {
  return new Promise(function (resolve, reject) {
    s3.getObject({
      Bucket: "myBucket",
      Key: "myKey/" + path
    }, function (err, data) {
      if (err) {
        reject(err);
      } else {
        if (replace === true) {
            var re = new RegExp(Object.keys(mapObj).join("|"), "gi");
            data = data.Body.toString().replace(re, function (matched) {
              return mapObj[matched.toLowerCase()];
            });
            resolve(data);
        } else {
            resolve(data.Body.toString());
        }
      }
    });
  });
}

function getRegisterSource (nickname, activate_link) {
  var activate_link, pieces;

  pieces = [
    getPieceOfMail("starts/start.html", {}, false),
    getPieceOfMail("headers/a.html", {}, false),
    getPieceOfMail("footers/a.html", {}, false),
  ];

  return Promise.all(pieces)
    .then(function (data) {
      return (data[0] + data[1] + data[2]);
    })
    .catch(function (err) {
      return err;
    });
}

function sendEmail (email, data) {
    return new Promise(function (resolve, reject) {
        var params = {
            Destination: { ToAddresses: [email] },
            Message: {
              Body: {
                Html: {
                  Data: data
                },
                Text: {
                  Data: data
                }
              },
              Subject: {
                Data: "myData"
              }
            },
            Source: "someone <[email protected]>",
        };

        ses.sendEmail(params, function (err, data) {
            if (err) {
                reject(err);
            } else {
                resolve(data);
            }
        });
    });
}

function process(message, callback) {
    console.log(message);

    // process message
    getRegisterSource(event['nickname'], event['user_id'])
      .then(function (data) {
        return sendEmail(event["email"], data);
      })
      .catch(function (err) {
        console.log("==ERROR==");
        callback(err, err);
      })
      .finally(function () {});

    // delete message
    const params = {
        QueueUrl: QUEUE_URL,
        ReceiptHandle: message.ReceiptHandle,
    };
    SQS.deleteMessage(params, (err) => callback(err, message));
}

function invokePoller(functionName, message) {
    const payload = {
        operation: PROCESS_MESSAGE,
        message,
    };
    const params = {
        FunctionName: functionName,
        InvocationType: 'Event',
        Payload: new Buffer(JSON.stringify(payload)),
    };
    return new Promise((resolve, reject) => {
        Lambda.invoke(params, (err) => (err ? reject(err) : resolve()));
    });
}

function poll(functionName, callback) {
    const params = {
        QueueUrl: QUEUE_URL,
        MaxNumberOfMessages: 10,
        VisibilityTimeout: 10,
    };
    // batch request messages
    SQS.receiveMessage(params, (err, data) => {
        if (err) {
            return callback(err);
        }
        // for each message, reinvoke the function
        const promises = data.Messages.map((message) => invokePoller(functionName, message));
        // complete when all invocations have been made
        Promise.all(promises).then(() => {
            const result = `Messages received: ${data.Messages.length}`;
            callback(null, result);
        });
    });
}

exports.handler = (event, context, callback) => {
    try {
        if (event.operation === PROCESS_MESSAGE) {
            console.log("Invoked by poller");
            process(event.message, callback);
        } else {
            console.log("invoked by schedule");
            poll(context.functionName, callback);
        }
    } catch (err) {
        callback(err);
    }
};

有人可以告诉我这个吗?

谢谢你的建议。

UPDATE

经过这么多的误解之后,我决定开始研究example提供的qlingxswpoi轮询-SQS的工作原理。

在那里我发现我缺少一些基本的SQS权限,但现在通过添加正确的策略解决了:

AWS

这允许{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": ["*"] }] } 调用Lambda.invoke()

process()被调用时,如果我process(message, callback),似乎没有消息,虽然队列被线路console.log(message);清除

我正在尝试的是结合我的SQS.deleteMessage(params, (err) => callback(err, message));功能,目前正在使用sendMail服务,所以我只需要SQS每条消息到push

node.js amazon-web-services aws-lambda amazon-sqs
1个回答
0
投票

这是AWS SES在发送电子邮件时有其自身限制的常见要求。如果违反了这些限制,SES帐户将自行沙箱。您似乎已使用适当的访问凭据解决了问题。

此代码包含一个Python3 Lambda代码,可用于处理此类情况,其中Lambda使用线程从SQS轮询,并使用SES发送电子邮件,而不超出给定的限制。

链接到queue

当在SQS中放置新消息时,您还可以考虑使用SQS中的新功能,该功能能够调用lambdas。但请注意不要超过AWS账户区域内的lambda函数的最大数量。 (见Github Project

© www.soinside.com 2019 - 2024. All rights reserved.