Asyncio 与 boto 和 for range?

问题描述 投票:0回答:1

我正在第一次尝试 asyncio botocore 实现。然而,我很确定我没有得到预期的异步性,可能是因为我自己缺乏这方面的经验。 :)

以下方法的目标是复制存储桶中的所有文件,同时为键添加 UUID 后缀

async def async_duplicate_files_in_bucket(bucket,
                                          how_many_times=1):
    session = get_session()

    async with session.create_client('s3') as s3_client:
        s3_client: S3Client

        paginator = s3_client.get_paginator('list_objects')
        async for result in paginator.paginate(Bucket=bucket):

            for file in result["Contents"]:
                # it already includes the prefix in the same
                original_file_name: str = file["Key"]
                logger.debug(f"Duplicating file: {original_file_name} ")

                for _ in range(how_many_times):
                    new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
                    copy_source = {
                        'Bucket': bucket,
                        'Key': original_file_name
                    }

                    await s3_client.copy_object(Bucket=bucket,
                                                CopySource=copy_source,
                                                Key=new_file_name)
                    print("-", end="")

查看终端时:

  1. 我看到
    Duplicating file: file_1
    在完成复制之前不会移动到下一个文件
    file_1
    。就在那时,我得到了一条带有
    Duplicating file: file_2
    的新日志行。
  2. print('-', end="")
    未打印

鉴于我对

asyncio
的一点经验,我假设
for _ in range(how_many_times)
正在阻塞事件循环。

了解方向,以更好地理解如何在 Python 中使用

asyncio
以及实现该函数的目标。

谢谢。

python amazon-web-services asynchronous python-asyncio boto
1个回答
0
投票

您没有阻止事件循环。您正在所做的是没有正确使用

asyncio
并发!

使用

asyncio.gather()
帮助器允许您并行运行多个异步操作:

async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
    session = get_session()

    s3_client: S3Client
    async with session.create_client("s3") as s3_client:
        
        awaitables: list[Awaitable[Any]] = []
        paginator = s3_client.get_paginator("list_objects")
        async for result in paginator.paginate(Bucket=bucket):

            for file in result["Contents"]:
                # it already includes the prefix in the same
                original_file_name: str = file["Key"]
                logger.debug(f"Duplicating file: {original_file_name} ")

                for _ in range(how_many_times):
                    new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
                    copy_source = {"Bucket": bucket, "Key": original_file_name}

                    awaitable = s3_client.copy_object(
                        Bucket=bucket,
                        CopySource=copy_source,
                        Key=new_file_name,
                    )
                    awaitables.append(awaitable)
                    print("-", end="")

        await asyncio.gather(*awaitables)

重要提示:根据分页器中的结果数量,您可能需要通过批处理执行更智能的操作,以避免内存不足!

另请参阅: asyncio.gather() 文档

© www.soinside.com 2019 - 2024. All rights reserved.