在现有S3对象上运行S3-put-triggered Lambda函数?

问题描述 投票:6回答:5

我在Node.js中有一个Lambda函数来处理添加到我的存储桶的新图像。我想为所有现有对象运行该函数。我怎样才能做到这一点?我认为最简单的方法是“重新放置”每个对象,以触发该功能,但我不知道该怎么做。

要清楚 - 我想在每个现有对象上运行一次。触发器已经在为新对象工作,我只需要在创建lambda函数之前插入的对象上运行它。

node.js amazon-web-services amazon-s3 lambda
5个回答
4
投票

以下Lambda函数将执行您所需的操作。

它将遍历目标S3存储桶中的每个文件,并为每个文件执行所需的lambda函数,以模拟put操作。

您可能希望对此功能施加很长的执行时间限制

var TARGET_BUCKET="my-bucket-goes-here";
var TARGET_LAMBDA_FUNCTION_NAME="TestFunct";
var S3_PUT_SIMULATION_PARAMS={
  "Records": [
    {
      "eventVersion": "2.0",
      "eventTime": "1970-01-01T00:00:00.000Z",
      "requestParameters": {
        "sourceIPAddress": "127.0.0.1"
      },
      "s3": {
        "configurationId": "testConfigRule",
        "object": {
          "eTag": "0123456789abcdef0123456789abcdef",
          "sequencer": "0A1B2C3D4E5F678901",
          "key": "HappyFace.jpg",
          "size": 1024
        },
        "bucket": {
          "arn": "arn:aws:s3:::mybucket",
          "name": "sourcebucket",
          "ownerIdentity": {
            "principalId": "EXAMPLE"
          }
        },
        "s3SchemaVersion": "1.0"
      },
      "responseElements": {
        "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
        "x-amz-request-id": "EXAMPLE123456789"
      },
      "awsRegion": "us-east-1",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "EXAMPLE"
      },
      "eventSource": "aws:s3"
    }
  ]
};

var aws = require('aws-sdk');
var s3 = new aws.S3();
var lambda = new aws.Lambda();


exports.handler = (event, context, callback) => {
    retrieveS3BucketContents(TARGET_BUCKET, function(s3Objects){
        simulateS3PutOperation(TARGET_BUCKET, s3Objects, simulateS3PutOperation, function(){ 
            console.log("complete."); 
        });
    });
};

function retrieveS3BucketContents(bucket, callback){
    s3.listObjectsV2({
        Bucket: TARGET_BUCKET
    }, function(err, data) {
        callback(data.Contents);
    });
}

function simulateS3PutOperation(bucket, s3ObjectStack, callback, callbackEmpty){
    var params = {
      FunctionName: TARGET_LAMBDA_FUNCTION_NAME, 
      Payload: ""
    };

    if(s3ObjectStack.length > 0){
        var s3Obj = s3ObjectStack.pop();
        var p = S3_PUT_SIMULATION_PARAMS;
        p.Records[0].s3.bucket.name = bucket;
        p.Records[0].s3.object.key = s3Obj.Key;
        params.Payload = JSON.stringify(p, null, 2);
        lambda.invoke(params, function(err, data) {
          if (err) console.log(err, err.stack); // an error occurred
          else{
              callback(bucket, s3ObjectStack, callback, callbackEmpty);
          }
        });
    }
    else{
        callbackEmpty();   
    }
}

以下是lambda查询执行此方法所需的完整策略,它允许R / W到CloudWatch日志和ListObject访问S3。您需要在看到MY-BUCKET-GOES-HERE的地方填写您的水桶详细信息

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1477382207000",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::MY-BUCKET-GOES-HERE/*"
            ]
        },
        {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
    ]
}

2
投票

因为我必须在一个非常大的桶上执行此操作,并且lambda函数具有最大值。执行时间为10分钟,我最终用Ruby AWS-SDK做了一个脚本。

require 'aws-sdk-v1'

class LambdaS3Invoker

  BUCKET_NAME = "HERE_YOUR_BUCKET"
  FUNCTION_NAME = "HERE_YOUR_FUNCTION_NAME"
  AWS_KEY = "HERE_YOUR_AWS_KEY"
  AWS_SECRET = "HERE_YOUR_AWS_SECRET"
  REGION = "HERE_YOUR_REGION"

  def execute
    bucket.objects({ prefix: 'products'}).each do |o|
      lambda_invoke(o.key)
    end
  end

  private

  def lambda_invoke(key)
    lambda.invoke({
      function_name: FUNCTION_NAME,
      invocation_type: 'Event',
      payload: JSON.generate({
        Records: [{
          s3: {
            object: {
              key: key,
            },
            bucket: {
              name: BUCKET_NAME,
            }
          }
        }]
      })
    })
  end

  def lambda
    @lambda ||= Aws::Lambda::Client.new(
      region: REGION,
      access_key_id: AWS_KEY,
      secret_access_key: AWS_SECRET
    )
  end

  def resource
    @resource ||= Aws::S3::Resource.new(
      access_key_id: AWS_KEY,
      secret_access_key: AWS_SECRET
    )
  end

  def bucket
    @bucket ||= resource.bucket(BUCKET_NAME)
  end
end

然后你可以这样称呼:

LambdaS3Invoker.new.execute

1
投票

这个线程帮我推动了正确的方向,因为我需要为两个存储桶中现有的50k文件调用每个文件的lambda函数。我决定在python中编写它并将同时运行的lambda函数的数量限制为500(许多aws区域的并发限制为1000)。

该脚本创建一个包含500个线程的工作池,这些线程以桶密钥队列为基础。每个工人等待他们的lambda完成,然后再拿起另一个。由于针对我的50k文件执行此脚本需要几个小时,因此我只是在本地计算机上运行它。希望这有助于某人!

#!/usr/bin/env python

# Proper imports
import json
import time
import base64
from queue import Queue
from threading import Thread
from argh import dispatch_command

import boto3
from boto.s3.connection import S3Connection

client = boto3.client('lambda')

def invoke_lambdas():
    try:
        # replace these with your access keys
        s3 = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
        buckets = [s3.get_bucket('bucket-one'), s3.get_bucket('bucket-two')]

        queue = Queue()
        num_threads = 500

        # create a worker pool
        for i in range(num_threads):
            worker = Thread(target=invoke, args=(queue,))
            worker.setDaemon(True)
            worker.start()

        for bucket in buckets:
            for key in bucket.list():
                queue.put((bucket.name, key.key))

        queue.join()

    except Exception as e:
        print(e)

def invoke(queue):
    while True:
        bucket_key = queue.get()

        try:
            print('Invoking lambda with bucket %s key %s. Remaining to process: %d'
                % (bucket_key[0], bucket_key[1], queue.qsize()))
            trigger_event = {
                'Records': [{
                    's3': {
                        'bucket': {
                            'name': bucket_key[0]
                        },
                        'object': {
                            'key': bucket_key[1]
                        }
                    }
                }]
            }

            # replace lambda_function_name with the actual name
            # InvocationType='RequestResponse' means it will wait until the lambda fn is complete
            response = client.invoke(
                FunctionName='lambda_function_name',
                InvocationType='RequestResponse',
                LogType='None',
                ClientContext=base64.b64encode(json.dumps({}).encode()).decode(),
                Payload=json.dumps(trigger_event).encode()
            )
            if response['StatusCode'] != 200:
                print(response)

        except Exception as e:
            print(e)
            print('Exception during invoke_lambda')

        queue.task_done()

if __name__ == '__main__':
    dispatch_command(invoke_lambdas)

0
投票

您需要做的是创建一个使用AWS SDK调用lambda函数的一次性脚本。此解决方案不需要您“重新放置”对象。

我将基于AWS JS SDK做出回答。

要清楚 - 我想在每个现有对象上运行一次。触发器已经在为新对象工作,我只需要在创建lambda函数之前插入的对象上运行它。

由于你有一个工作的lambda函数接受S3 put事件你需要做的是在S3中找到所有未处理的对象(如果你有每个S3对象的DB条目,如果没有那么上面应该很容易,那么你可能会找到S3列表对象功能方便http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#listObjectsV2-property)。

然后,为获得的每个未处理的S3对象创建一个JSON对象,该对象看起来像S3 Put Event Message(如下所示),并使用上述JSON对象作为有效负载调用Lambda调用函数。

您可以在http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property找到lambda调用函数文档

为lambda函数创建假的S3 Put事件消息对象时,您可以忽略大多数实际的对象属性,具体取决于您的lambda函数。我想你必须设置的最少的是存储桶名称和对象密钥。

S3放置事件消息结构http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html

{  
   "Records":[  
      {  
         "eventVersion":"2.0",
         "eventSource":"aws:s3",
         "awsRegion":"us-east-1",
         "eventTime":"1970-01-01T00:00:00.000Z",
         "eventName":"ObjectCreated:Put",
         "userIdentity":{  
            "principalId":"AIDAJDPLRKLG7UEXAMPLE"
         },
         "requestParameters":{  
            "sourceIPAddress":"127.0.0.1"
         },
         "responseElements":{  
            "x-amz-request-id":"C3D13FE58DE4C810",
            "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"testConfigRule",
            "bucket":{  
               "name":"mybucket",
               "ownerIdentity":{  
                  "principalId":"A3NL1KOZZKExample"
               },
               "arn":"arn:aws:s3:::mybucket"
            },
            "object":{  
               "key":"HappyFace.jpg",
               "size":1024,
               "eTag":"d41d8cd98f00b204e9800998ecf8427e",
               "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko",
               "sequencer":"0055AED6DCD90281E5"
            }
         }
      }
   ]
}

-1
投票

基本上你需要的是使用一些api调用(例如,如果你使用python),并列出你的s3存储桶中的所有新对象或所有对象,然后处理这些对象

这是一个片段:

from boto.s3.connection import S3Connection

conn = S3Connection()
source = conn.get_bucket(src_bucket)
src_list = set([key.name for key in source.get_all_keys(headers=None, prefix=prefix)])
//and then you can go over this src list
for entry in src_list:
   do something
© www.soinside.com 2019 - 2024. All rights reserved.