我有一个 Azure 函数应用程序,可以读取 blob,然后处理它们。我收到此警告
Unclosed client session
Main 使用await 调用getblob 函数
import logging
from ..shared import blobOpsAIO
import azure.functions as func
import pandas as pd
from ..shared import invOps
import sys, os
async def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
try:
req_body = req.get_json()
file_name_path = req_body.get("file_name_path", "")
file_name = req_body.get("file_name", "")
account_id = req_body.get("account_id", "")
inv = await getBlob(file_name_path, file_name, container_name=account_id)
#process inv
下载文件 获取斑点:
async def getBlob(blobPathName, LOCALFILENAME, dtype_param = None, container_name = 'somevalue', blob_type = 'csv', encoding='UTF-8', skiprows=0, header=0, sep=','):
df = pd.DataFrame()
# some validations
df = await read_blob_content(connection_string, container_name, blobPathName, blob_type, LOCALFILENAME, dtype_param, encoding, skiprows, header, sep)
# do something with df
return df
async def read_blob_content(connection_string, container_name, blob_name, blob_type, LOCALFILENAME, dtype_param, encoding, skiprows=0, header=0, sep=","):
async with aiohttp.ClientSession() as session:
blob_client = BlobClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
blob_name=blob_name
)
print(f'Going to fetch blobPathName: {blob_name}')
blob_data = await blob_client.download_blob()
async with aiofiles.open(LOCALFILENAME, "wb") as my_blob:
data = await blob_data.readall()
await my_blob.write(data)
await session.close() # also tried placing this line one level up in the identation
success = True
#open the file using pandas and process
return df
except Exception as e:
print(str(e))
raise e
我尝试添加/删除
await session.close()
但没有成功。还将该线放置在与 async with aiohttp.ClientSession() as session
相同的标识级别
使用下面的代码,我能够获得预期的结果,而没有任何警告消息。
import logging
import azure.functions as func
import pandas as pd
import aiohttp
import aiofiles
from azure.storage.blob import BlobClient
connection_string='*********';
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger")
async def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
try:
req_body = req.get_json()
file_name_path = req_body.get('file_name_path')
file_name = req_body.get('file_name')
account_id = req_body.get('account_id')
inv = await get_blob(file_name_path, file_name, container_name=account_id)
# Process inv
return func.HttpResponse(f"File {file_name} downloaded and processed successfully!", status_code=200)
except Exception as e:
logging.error(str(e))
return func.HttpResponse("An error occurred while processing the request.", status_code=500)
async def get_blob(file_name_path, file_name, dtype_param=None, container_name='test', blob_type='csv', encoding='UTF-8', skiprows=0, header=0, sep=','):
df = pd.DataFrame()
try:
df = await read_blob_content(
connection_string,
container_name, file_name, blob_type, file_name_path, dtype_param, encoding, skiprows, header, sep
)
except Exception as e:
logging.error(str(e))
raise e
# Process the dataframe
return df
async def read_blob_content(connection_string, container_name, blob_name,blob_type, file_name_path, dtype_param, encoding, skiprows=0, header=0, sep=","):
try:
async with aiohttp.ClientSession() as session:
blob_client = BlobClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
blob_name=blob_name
)
logging.info(f'Going to fetch blobPathName: {blob_name}')
blob_data = blob_client.download_blob()
async with aiofiles.open(file_name_path, mode='wb') as my_blob:
data = blob_data.readall()
await my_blob.write(data)
except Exception as e:
logging.error(str(e))
raise e
# Read the file using pandas and process
df = pd.read_csv(file_name_path, dtype=dtype_param, encoding=encoding, skiprows=skiprows, header=header, sep=sep)
return df
需求.txt
azure-functions
pandas
aiohttp
aiofiles
azure-storage-blob