我是Lambda
和SQS
的新手,我正在尝试创建一个函数来发送电子邮件,在SQS
服务中排队,但我不明白如何调用包含send
+ delete queue
方法的过程函数。
下面我粘贴我的代码:
'use strict';
const AWS = require('aws-sdk');
const SQS = new AWS.SQS({ apiVersion: '2012-11-05' });
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });
const ses = new AWS.SES({ accessKeyId: "xxxxxxxx", secretAccesskey: "xxxxxxx/xxxxxxxxx" });
const s3 = new AWS.S3({ apiVersion: "2006-03-01", region: "us-west-2" });
const QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/xxxxxxx/queue';
const PROCESS_MESSAGE = 'process-message';
function getPieceOfMail (path, mapObj, replace) {
return new Promise(function (resolve, reject) {
s3.getObject({
Bucket: "myBucket",
Key: "myKey/" + path
}, function (err, data) {
if (err) {
reject(err);
} else {
if (replace === true) {
var re = new RegExp(Object.keys(mapObj).join("|"), "gi");
data = data.Body.toString().replace(re, function (matched) {
return mapObj[matched.toLowerCase()];
});
resolve(data);
} else {
resolve(data.Body.toString());
}
}
});
});
}
function getRegisterSource (nickname, activate_link) {
var activate_link, pieces;
pieces = [
getPieceOfMail("starts/start.html", {}, false),
getPieceOfMail("headers/a.html", {}, false),
getPieceOfMail("footers/a.html", {}, false),
];
return Promise.all(pieces)
.then(function (data) {
return (data[0] + data[1] + data[2]);
})
.catch(function (err) {
return err;
});
}
function sendEmail (email, data) {
return new Promise(function (resolve, reject) {
var params = {
Destination: { ToAddresses: [email] },
Message: {
Body: {
Html: {
Data: data
},
Text: {
Data: data
}
},
Subject: {
Data: "myData"
}
},
Source: "someone <[email protected]>",
};
ses.sendEmail(params, function (err, data) {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
function process(message, callback) {
console.log(message);
// process message
getRegisterSource(event['nickname'], event['user_id'])
.then(function (data) {
return sendEmail(event["email"], data);
})
.catch(function (err) {
console.log("==ERROR==");
callback(err, err);
})
.finally(function () {});
// delete message
const params = {
QueueUrl: QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
};
SQS.deleteMessage(params, (err) => callback(err, message));
}
function invokePoller(functionName, message) {
const payload = {
operation: PROCESS_MESSAGE,
message,
};
const params = {
FunctionName: functionName,
InvocationType: 'Event',
Payload: new Buffer(JSON.stringify(payload)),
};
return new Promise((resolve, reject) => {
Lambda.invoke(params, (err) => (err ? reject(err) : resolve()));
});
}
function poll(functionName, callback) {
const params = {
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: 10,
VisibilityTimeout: 10,
};
// batch request messages
SQS.receiveMessage(params, (err, data) => {
if (err) {
return callback(err);
}
// for each message, reinvoke the function
const promises = data.Messages.map((message) => invokePoller(functionName, message));
// complete when all invocations have been made
Promise.all(promises).then(() => {
const result = `Messages received: ${data.Messages.length}`;
callback(null, result);
});
});
}
exports.handler = (event, context, callback) => {
try {
if (event.operation === PROCESS_MESSAGE) {
console.log("Invoked by poller");
process(event.message, callback);
} else {
console.log("invoked by schedule");
poll(context.functionName, callback);
}
} catch (err) {
callback(err);
}
};
有人可以告诉我这个吗?
谢谢你的建议。
UPDATE
经过这么多的误解之后,我决定开始研究example提供的qlingxswpoi轮询-SQS的工作原理。
在那里我发现我缺少一些基本的SQS权限,但现在通过添加正确的策略解决了:
AWS
这允许{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": ["*"]
}]
}
调用Lambda.invoke()
。
当process()
被调用时,如果我process(message, callback)
,似乎没有消息,虽然队列被线路console.log(message);
清除
我正在尝试的是结合我的SQS.deleteMessage(params, (err) => callback(err, message));
功能,目前正在使用sendMail服务,所以我只需要SQS
每条消息到push
。
这是AWS SES在发送电子邮件时有其自身限制的常见要求。如果违反了这些限制,SES帐户将自行沙箱。您似乎已使用适当的访问凭据解决了问题。
此代码包含一个Python3 Lambda代码,可用于处理此类情况,其中Lambda使用线程从SQS轮询,并使用SES发送电子邮件,而不超出给定的限制。
链接到queue
。
当在SQS中放置新消息时,您还可以考虑使用SQS中的新功能,该功能能够调用lambdas。但请注意不要超过AWS账户区域内的lambda函数的最大数量。 (见Github Project)