AWS lambda函数实现中的Concurrent.futures

问题描述 投票:0回答:1

我正在使用AWS Lambda python函数将EBS / RDS快照复制到另一个区域以进行灾难恢复。我遇到的问题是当时有5个快照的副本限制。如果我一次尝试复制多于5张,则会出现错误:

botocore.exceptions.ClientError: An error occurred (ResourceLimitExceeded) when calling the CopySnapshot operation: Too many snapshot copies in progress. The limit is 5 for this destination region. 

[为避免这种情况,我添加了一个等待器功能,该功能正在检查目标区域中快照的状态,并且在快照完成状态后,它将继续循环。它运作良好,但在这种情况下,一次只能应付一个快照。问题是,如何为并行任务实现并发模块,该任务将同时复制5个快照?]

    waiter = client_ec2_dst.get_waiter('snapshot_completed')


    for i in ec2_snapshots_src:
            try:
                response = client_ec2_dst.copy_snapshot(
                    Description='[Disaster Recovery] copied from us-east-1',
                    SourceRegion=region_src,
                    SourceSnapshotId=i["SnapshotId"],
                    DryRun=False
                )
                new_snapshot_id = response["SnapshotId"]
                waiter.wait(
                    SnapshotIds=[new_snapshot_id],
                    WaiterConfig={'Delay': 5, 'MaxAttempts': 120}
                )
            except Exception as e:
                raise e

脚本的完整ESB复制部分:

def lambda_handler(event, context):
    message = ""
    ec2_snapshots_src = get_ec2_snapshots_src()
    # print(*ec2_snapshot_list_src, sep="\n")
    if len(ec2_snapshots_src) == 0:
        message_error = ("No automated daily EBS snapshots found in " + region_src)
        send_sns(subject_alert, message_error)
    elif ec2_snapshots_src:
        if message:
            message += "\n\n"
        # Running function "Copying EBS snapshots from source region to destination region"
        c = 0
        for i in ec2_snapshots_src:
            if c < 50:
                snapshot_tags_filtered = ([item for item in i["Tags"] if item['Key'] != 'aws:backup:source-resource'])
                snapshot_tags_filtered.append({'Key': 'delete_On', 'Value': delete_on})
                snapshot_tags_filtered.append({'Key': 'src_Id', 'Value': i["SnapshotId"]})
                try:
                    response = client_ec2_dst.copy_snapshot(
                        Description='[Disaster Recovery] copied from us-east-1',
                        SourceRegion=region_src,
                        SourceSnapshotId=i["SnapshotId"],
                        DryRun=False,
                    #           Encrypted=True,
                    #           KmsKeyId='1e287363-89f6-4837-a619-b550ff28c211',
                    )
                    new_snapshot_id = response["SnapshotId"]
                    waiter.wait(
                        SnapshotIds=[new_snapshot_id],
                        WaiterConfig={'Delay': 5, 'MaxAttempts': 120}
                    )
                    snapshot_src_name = ([dic['Value'] for dic in snapshot_tags_filtered if dic['Key'] == 'Name'])
                    message += ("Started copying latest EBS snapshot: " + i["SnapshotId"] + " for EC2 instance: " + str(snapshot_src_name) + " from: " + region_src + " to: " + region_dst + " with new id: " + new_snapshot_id + ".\n")

                    # Adding tags to snapshots in destination region
                    tag_src = [new_snapshot_id]
                    tag = client_ec2_dst.create_tags(
                        DryRun=False,
                        Resources=tag_src,
                        Tags=snapshot_tags_filtered
                    )
                except Exception as e:
                    raise e

                c = c + 1
                # time.sleep(60)
            else:
                message_error = ("There are > then 50 EBS snapshots needs to be copied to " + region_dst)
                send_sns(subject_alert, message_error)
                exit(0)

    if message:
        send_sns(subject_notify, message)
        print(message)
    else:
        message_error = "Message wasn't generated by script, check lambda function!"
        send_sns(subject_alert, message_error)
python amazon-web-services aws-lambda boto3 concurrent.futures
1个回答
1
投票

您可以使用并发执行程序和max_workers参数来限制同时运行多少作业。像这样:

import concurrent.futures

def copy_snapshot(snapshot_id):
    waiter = client_ec2_dst.get_waiter('snapshot_completed')
    response = client_ec2_dst.copy_snapshot(
        Description='[Disaster Recovery] copied from us-east-1',
        SourceRegion=region_src,
        SourceSnapshotId=snapshot_id,
        DryRun=False
    )
    new_snapshot_id = response["SnapshotId"]
    waiter.wait(
        SnapshotIds=[new_snapshot_id],
        WaiterConfig={'Delay': 5, 'MaxAttempts': 120}
    )

# Copy snapshots in parallel, but no more than 5 at a time:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [
        executor.submit(copy_snapshot, s['SnapshotId'])
        for s in ec2_snapshots_src]
    for future in futures:
        future.result()
© www.soinside.com 2019 - 2024. All rights reserved.