在[上一个问题][1]中,我问了如何异步下载数据,得到了解答。但是,在 Azure 函数上运行代码时,在运行 Azure 函数的多个调用时,我会收到以下错误
asyncio.run() cannot be called from a running event loop
,在这种情况下,我会认为它们是在同一线程上运行。我知道您可以检查正在运行的事件循环,但我希望您能提供有关最佳位置以及如何将其合并到下面的代码中的建议。
from azure.storage.blob.aio import BlobServiceClient
async def download_blob_to_file(blob_service_client: BlobServiceClient, container_name, transaction_date, customer_id):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{transaction_date}/{customer_id}.csv")
with open(file=f'{customer_id}.csv', mode="wb") as sample_blob:
download_stream = await blob_client.download_blob()
data = await download_stream.readall()
sample_blob.write(data)
async def main(transaction_date, customer_id):
connect_str = "connection-string"
blob_serv_client = BlobServiceClient.from_connection_string(connect_str)
async with blob_serv_client as blob_service_client:
await download_blob_to_file(blob_service_client, "sample-container", transaction_date, customer_id)
if __name__ == '__main__':
transaction_date = '20240409'
customer_id = '001'
# customer_id_list = ['001', '002', '003', '004']
asyncio.run(main(transaction_date, customer_id))```
[1]: https://stackoverflow.com/questions/78377808/download-multiple-azure-blobs-asynchronously
我使用Python V1模型中的HTTP触发功能成功下载了CSV blob文件。
代码:
init.py:
import azure.functions as func
from azure.storage.blob.aio import BlobServiceClient
import asyncio
import os
async def download_blob_to_file(blob_service_client: BlobServiceClient, container_name, transaction_date, customer_id):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{transaction_date}/{customer_id}.csv")
async with blob_client:
data = await blob_client.download_blob()
content = await data.readall()
with open(f'{customer_id}.csv', mode="wb") as file:
file.write(content)
async def main(req: func.HttpRequest) -> func.HttpResponse:
transaction_date = req.params.get('transaction_date')
customer_ids_param = req.params.get('customer_ids')
if transaction_date is None or customer_ids_param is None:
return func.HttpResponse("Missing parameters", status_code=400)
customer_id_list = customer_ids_param.split(',')
connect_str = os.environ["Connect_str"]
blob_serv_client = BlobServiceClient.from_connection_string(connect_str)
async with blob_serv_client:
tasks = []
for customer_id in customer_id_list:
task = asyncio.create_task(download_blob_to_file(blob_serv_client, "sample-container", transaction_date, customer_id))
tasks.append(task)
await asyncio.gather(*tasks)
return func.HttpResponse("Blob download completed.", status_code=200)
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "<storage_conne>",
"FUNCTIONS_WORKER_RUNTIME": "python",
"Connect_str": "<storage_conne>"
}
}
输出:
使用上述HTTP触发函数成功下载blob文件,如下图:
浏览器输出:
http://localhost:7071/api/HttpTrigger1?transaction_date=20240409&customer_ids=001,002,003,004
Blob download completed.