我正在尝试提高从 Azure 下载 blob 的速度。使用 azure 的 examples 包,我创建了自己的示例。但是,它仅适用于单个文件。我希望能够以 customer_id_list (下面注释掉)的形式传入多个客户,以便可以同时下载文件。但是,我不确定如何扩展 aiohttp 代码来实现此目的。
import asyncio
from azure.storage.blob.aio import BlobServiceClient
async def download_blob_to_file(blob_service_client: BlobServiceClient, container_name, transaction_date, customer_id):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{transaction_date}/{customer_id}.csv")
with open(file=f'{customer_id}.csv', mode="wb") as sample_blob:
download_stream = await blob_client.download_blob()
data = await download_stream.readall()
sample_blob.write(data)
async def main(transaction_date, customer_id):
connect_str = "connection-string"
blob_serv_client = BlobServiceClient.from_connection_string(connect_str)
async with blob_serv_client as blob_service_client:
await download_blob_to_file(blob_service_client, "sample-container", transaction_date, customer_id)
if __name__ == '__main__':
transaction_date = '20240409'
customer_id = '001'
# customer_id_list = ['001', '002', '003', '004']
asyncio.run(main(transaction_date, customer_id))
异步下载多个 Azure blob。
您可以使用下面的代码和
asyncio.gather
函数来同时运行多个协程,同时下载多个 blob。
以下是用于异步下载多个 Azure blob 的修改后的代码。在我的环境中,我创建了一个
CSV
文件,正如您提到的。
传送门:
代码:
import asyncio
from azure.storage.blob.aio import BlobServiceClient
async def download_blob_to_file(blob_service_client: BlobServiceClient, container_name, transaction_date, customer_id):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{transaction_date}/{customer_id}.csv")
with open(file=f'{customer_id}.csv', mode="wb") as sample_blob:
download_stream = await blob_client.download_blob()
data = await download_stream.readall()
sample_blob.write(data)
async def main(transaction_date, customer_id_list):
connect_str = "<connectionstring>"
blob_serv_client = BlobServiceClient.from_connection_string(connect_str)
async with blob_serv_client as blob_service_client:
tasks = []
for customer_id in customer_id_list:
task = asyncio.create_task(download_blob_to_file(blob_service_client, "test", transaction_date, customer_id))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == '__main__':
transaction_date = '20240409'
customer_id_list = ['001', '002', '003', '004']
asyncio.run(main(transaction_date, customer_id_list))
上面的代码使用客户 ID 列表作为传递给主函数的参数。列表中生成的每个作业相当于下载单个 blob。要同时执行每个操作,请使用
asyncio.gather
函数。这有助于加快下载多个 blob 的过程。
输出: