我正在第一次尝试 asyncio botocore 实现。然而,我很确定我没有得到预期的异步性,可能是因为我自己缺乏这方面的经验。 :)
以下方法的目标是复制存储桶中的所有文件,同时为键添加 UUID 后缀。
async def async_duplicate_files_in_bucket(bucket,
how_many_times=1):
session = get_session()
async with session.create_client('s3') as s3_client:
s3_client: S3Client
paginator = s3_client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket):
for file in result["Contents"]:
# it already includes the prefix in the same
original_file_name: str = file["Key"]
logger.debug(f"Duplicating file: {original_file_name} ")
for _ in range(how_many_times):
new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
copy_source = {
'Bucket': bucket,
'Key': original_file_name
}
await s3_client.copy_object(Bucket=bucket,
CopySource=copy_source,
Key=new_file_name)
print("-", end="")
查看终端时:
Duplicating file: file_1
在完成复制之前不会移动到下一个文件file_1
。就在那时,我得到了一条带有 Duplicating file: file_2
的新日志行。print('-', end="")
未打印鉴于我对
asyncio
的一点经验,我假设 for _ in range(how_many_times)
正在阻塞事件循环。
了解方向,以更好地理解如何在 Python 中使用
asyncio
以及实现该函数的目标。
谢谢。
您没有阻止事件循环。您正在所做的是没有正确使用
asyncio
并发!
使用
asyncio.gather()
帮助器允许您并行运行多个异步操作:
async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
session = get_session()
s3_client: S3Client
async with session.create_client("s3") as s3_client:
awaitables: list[Awaitable[Any]] = []
paginator = s3_client.get_paginator("list_objects")
async for result in paginator.paginate(Bucket=bucket):
for file in result["Contents"]:
# it already includes the prefix in the same
original_file_name: str = file["Key"]
logger.debug(f"Duplicating file: {original_file_name} ")
for _ in range(how_many_times):
new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
copy_source = {"Bucket": bucket, "Key": original_file_name}
awaitable = s3_client.copy_object(
Bucket=bucket,
CopySource=copy_source,
Key=new_file_name,
)
awaitables.append(awaitable)
print("-", end="")
await asyncio.gather(*awaitables)
重要提示:根据分页器中的结果数量,您可能需要通过批处理执行更智能的操作,以避免内存不足!
另请参阅: asyncio.gather() 文档