我在Node.js中有一个Lambda函数来处理添加到我的存储桶的新图像。我想为所有现有对象运行该函数。我怎样才能做到这一点?我认为最简单的方法是“重新放置”每个对象,以触发该功能,但我不知道该怎么做。
要清楚 - 我想在每个现有对象上运行一次。触发器已经在为新对象工作,我只需要在创建lambda函数之前插入的对象上运行它。
以下Lambda函数将执行您所需的操作。
它将遍历目标S3存储桶中的每个文件,并为每个文件执行所需的lambda函数,以模拟put操作。
您可能希望对此功能施加很长的执行时间限制
var TARGET_BUCKET="my-bucket-goes-here";
var TARGET_LAMBDA_FUNCTION_NAME="TestFunct";
var S3_PUT_SIMULATION_PARAMS={
"Records": [
{
"eventVersion": "2.0",
"eventTime": "1970-01-01T00:00:00.000Z",
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"s3": {
"configurationId": "testConfigRule",
"object": {
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901",
"key": "HappyFace.jpg",
"size": 1024
},
"bucket": {
"arn": "arn:aws:s3:::mybucket",
"name": "sourcebucket",
"ownerIdentity": {
"principalId": "EXAMPLE"
}
},
"s3SchemaVersion": "1.0"
},
"responseElements": {
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
"x-amz-request-id": "EXAMPLE123456789"
},
"awsRegion": "us-east-1",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"eventSource": "aws:s3"
}
]
};
var aws = require('aws-sdk');
var s3 = new aws.S3();
var lambda = new aws.Lambda();
exports.handler = (event, context, callback) => {
retrieveS3BucketContents(TARGET_BUCKET, function(s3Objects){
simulateS3PutOperation(TARGET_BUCKET, s3Objects, simulateS3PutOperation, function(){
console.log("complete.");
});
});
};
function retrieveS3BucketContents(bucket, callback){
s3.listObjectsV2({
Bucket: TARGET_BUCKET
}, function(err, data) {
callback(data.Contents);
});
}
function simulateS3PutOperation(bucket, s3ObjectStack, callback, callbackEmpty){
var params = {
FunctionName: TARGET_LAMBDA_FUNCTION_NAME,
Payload: ""
};
if(s3ObjectStack.length > 0){
var s3Obj = s3ObjectStack.pop();
var p = S3_PUT_SIMULATION_PARAMS;
p.Records[0].s3.bucket.name = bucket;
p.Records[0].s3.object.key = s3Obj.Key;
params.Payload = JSON.stringify(p, null, 2);
lambda.invoke(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else{
callback(bucket, s3ObjectStack, callback, callbackEmpty);
}
});
}
else{
callbackEmpty();
}
}
以下是lambda查询执行此方法所需的完整策略,它允许R / W到CloudWatch日志和ListObject访问S3。您需要在看到MY-BUCKET-GOES-HERE的地方填写您的水桶详细信息
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1477382207000",
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::MY-BUCKET-GOES-HERE/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
因为我必须在一个非常大的桶上执行此操作,并且lambda函数具有最大值。执行时间为10分钟,我最终用Ruby AWS-SDK做了一个脚本。
require 'aws-sdk-v1'
class LambdaS3Invoker
BUCKET_NAME = "HERE_YOUR_BUCKET"
FUNCTION_NAME = "HERE_YOUR_FUNCTION_NAME"
AWS_KEY = "HERE_YOUR_AWS_KEY"
AWS_SECRET = "HERE_YOUR_AWS_SECRET"
REGION = "HERE_YOUR_REGION"
def execute
bucket.objects({ prefix: 'products'}).each do |o|
lambda_invoke(o.key)
end
end
private
def lambda_invoke(key)
lambda.invoke({
function_name: FUNCTION_NAME,
invocation_type: 'Event',
payload: JSON.generate({
Records: [{
s3: {
object: {
key: key,
},
bucket: {
name: BUCKET_NAME,
}
}
}]
})
})
end
def lambda
@lambda ||= Aws::Lambda::Client.new(
region: REGION,
access_key_id: AWS_KEY,
secret_access_key: AWS_SECRET
)
end
def resource
@resource ||= Aws::S3::Resource.new(
access_key_id: AWS_KEY,
secret_access_key: AWS_SECRET
)
end
def bucket
@bucket ||= resource.bucket(BUCKET_NAME)
end
end
然后你可以这样称呼:
LambdaS3Invoker.new.execute
这个线程帮我推动了正确的方向,因为我需要为两个存储桶中现有的50k文件调用每个文件的lambda函数。我决定在python中编写它并将同时运行的lambda函数的数量限制为500(许多aws区域的并发限制为1000)。
该脚本创建一个包含500个线程的工作池,这些线程以桶密钥队列为基础。每个工人等待他们的lambda完成,然后再拿起另一个。由于针对我的50k文件执行此脚本需要几个小时,因此我只是在本地计算机上运行它。希望这有助于某人!
#!/usr/bin/env python
# Proper imports
import json
import time
import base64
from queue import Queue
from threading import Thread
from argh import dispatch_command
import boto3
from boto.s3.connection import S3Connection
client = boto3.client('lambda')
def invoke_lambdas():
try:
# replace these with your access keys
s3 = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
buckets = [s3.get_bucket('bucket-one'), s3.get_bucket('bucket-two')]
queue = Queue()
num_threads = 500
# create a worker pool
for i in range(num_threads):
worker = Thread(target=invoke, args=(queue,))
worker.setDaemon(True)
worker.start()
for bucket in buckets:
for key in bucket.list():
queue.put((bucket.name, key.key))
queue.join()
except Exception as e:
print(e)
def invoke(queue):
while True:
bucket_key = queue.get()
try:
print('Invoking lambda with bucket %s key %s. Remaining to process: %d'
% (bucket_key[0], bucket_key[1], queue.qsize()))
trigger_event = {
'Records': [{
's3': {
'bucket': {
'name': bucket_key[0]
},
'object': {
'key': bucket_key[1]
}
}
}]
}
# replace lambda_function_name with the actual name
# InvocationType='RequestResponse' means it will wait until the lambda fn is complete
response = client.invoke(
FunctionName='lambda_function_name',
InvocationType='RequestResponse',
LogType='None',
ClientContext=base64.b64encode(json.dumps({}).encode()).decode(),
Payload=json.dumps(trigger_event).encode()
)
if response['StatusCode'] != 200:
print(response)
except Exception as e:
print(e)
print('Exception during invoke_lambda')
queue.task_done()
if __name__ == '__main__':
dispatch_command(invoke_lambdas)
您需要做的是创建一个使用AWS SDK调用lambda函数的一次性脚本。此解决方案不需要您“重新放置”对象。
我将基于AWS JS SDK做出回答。
要清楚 - 我想在每个现有对象上运行一次。触发器已经在为新对象工作,我只需要在创建lambda函数之前插入的对象上运行它。
由于你有一个工作的lambda函数接受S3 put事件你需要做的是在S3中找到所有未处理的对象(如果你有每个S3对象的DB条目,如果没有那么上面应该很容易,那么你可能会找到S3列表对象功能方便http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#listObjectsV2-property)。
然后,为获得的每个未处理的S3对象创建一个JSON对象,该对象看起来像S3 Put Event Message(如下所示),并使用上述JSON对象作为有效负载调用Lambda调用函数。
您可以在http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property找到lambda调用函数文档
为lambda函数创建假的S3 Put事件消息对象时,您可以忽略大多数实际的对象属性,具体取决于您的lambda函数。我想你必须设置的最少的是存储桶名称和对象密钥。
S3放置事件消息结构http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
{
"Records":[
{
"eventVersion":"2.0",
"eventSource":"aws:s3",
"awsRegion":"us-east-1",
"eventTime":"1970-01-01T00:00:00.000Z",
"eventName":"ObjectCreated:Put",
"userIdentity":{
"principalId":"AIDAJDPLRKLG7UEXAMPLE"
},
"requestParameters":{
"sourceIPAddress":"127.0.0.1"
},
"responseElements":{
"x-amz-request-id":"C3D13FE58DE4C810",
"x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"testConfigRule",
"bucket":{
"name":"mybucket",
"ownerIdentity":{
"principalId":"A3NL1KOZZKExample"
},
"arn":"arn:aws:s3:::mybucket"
},
"object":{
"key":"HappyFace.jpg",
"size":1024,
"eTag":"d41d8cd98f00b204e9800998ecf8427e",
"versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko",
"sequencer":"0055AED6DCD90281E5"
}
}
}
]
}
基本上你需要的是使用一些api调用(例如,如果你使用python),并列出你的s3存储桶中的所有新对象或所有对象,然后处理这些对象
这是一个片段:
from boto.s3.connection import S3Connection
conn = S3Connection()
source = conn.get_bucket(src_bucket)
src_list = set([key.name for key in source.get_all_keys(headers=None, prefix=prefix)])
//and then you can go over this src list
for entry in src_list:
do something