python:从 ADL Gen 2 异步下载 blob 时出现未关闭的客户端会话警告

问题描述 投票:0回答:1

我有一个 Azure 函数应用程序,可以读取 blob,然后处理它们。我收到此警告

Unclosed client session

Main 使用await 调用getblob 函数

import logging
from ..shared import blobOpsAIO
import azure.functions as func
import pandas as pd
from ..shared import invOps
import sys, os

async def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    try:
        req_body = req.get_json()
        file_name_path = req_body.get("file_name_path", "")
        file_name = req_body.get("file_name", "")
        account_id = req_body.get("account_id", "")

        inv = await getBlob(file_name_path, file_name, container_name=account_id)

        #process inv

下载文件 获取斑点:

async def getBlob(blobPathName, LOCALFILENAME, dtype_param = None, container_name = 'somevalue', blob_type = 'csv', encoding='UTF-8', skiprows=0, header=0, sep=','):

        df = pd.DataFrame()
        # some validations   
        df = await read_blob_content(connection_string, container_name, blobPathName, blob_type, LOCALFILENAME, dtype_param, encoding, skiprows, header, sep)
        # do something with df
        return df
async def read_blob_content(connection_string, container_name, blob_name, blob_type, LOCALFILENAME, dtype_param, encoding, skiprows=0, header=0, sep=","):

                async with aiohttp.ClientSession() as session:
                    blob_client = BlobClient.from_connection_string(
                        conn_str=connection_string,
                        container_name=container_name,
                        blob_name=blob_name
                    )
                    print(f'Going to fetch blobPathName: {blob_name}')
                    blob_data = await blob_client.download_blob()
                    async with aiofiles.open(LOCALFILENAME, "wb") as my_blob:
                        data = await blob_data.readall()
                        await my_blob.write(data)
                    await session.close()    # also tried placing this line one level up in the identation 
                success = True
            

            #open the file using pandas and process
                
        return df
    except Exception as e:
        print(str(e))
        raise e

我尝试添加/删除

await session.close()
但没有成功。还将该线放置在与
async with aiohttp.ClientSession() as session

相同的标识级别
python python-3.x azure-functions
1个回答
0
投票

使用下面的代码,我能够获得预期的结果,而没有任何警告消息。

import logging
import azure.functions as func
import pandas as pd
import aiohttp
import aiofiles
from azure.storage.blob import BlobClient

connection_string='*********';

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="http_trigger")
async def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    try:
        req_body = req.get_json()
        file_name_path = req_body.get('file_name_path')
        file_name = req_body.get('file_name')
        account_id = req_body.get('account_id')

        inv = await get_blob(file_name_path, file_name, container_name=account_id)

        # Process inv
        return func.HttpResponse(f"File {file_name} downloaded and processed successfully!", status_code=200)
    
    except Exception as e:
        logging.error(str(e))
        return func.HttpResponse("An error occurred while processing the request.", status_code=500)

async def get_blob(file_name_path, file_name, dtype_param=None, container_name='test', blob_type='csv', encoding='UTF-8', skiprows=0, header=0, sep=','):
    df = pd.DataFrame()
    
    try:
        df = await read_blob_content(
            connection_string,
            container_name, file_name, blob_type, file_name_path, dtype_param, encoding, skiprows, header, sep
        )
    except Exception as e:
        logging.error(str(e))
        raise e
    
    # Process the dataframe
    return df

async def read_blob_content(connection_string, container_name, blob_name,blob_type, file_name_path, dtype_param, encoding, skiprows=0, header=0, sep=","):
    try:
        async with aiohttp.ClientSession() as session:
            blob_client = BlobClient.from_connection_string(
                conn_str=connection_string,
                container_name=container_name,
                blob_name=blob_name
            )
            logging.info(f'Going to fetch blobPathName: {blob_name}')
            blob_data = blob_client.download_blob()
            async with aiofiles.open(file_name_path, mode='wb') as my_blob:
                data = blob_data.readall()
                await my_blob.write(data)
    except Exception as e:
        logging.error(str(e))
        raise e
    
    # Read the file using pandas and process
    df = pd.read_csv(file_name_path, dtype=dtype_param, encoding=encoding, skiprows=skiprows, header=header, sep=sep)
    
    return df

需求.txt

azure-functions
pandas
aiohttp
aiofiles
azure-storage-blob
  • Blob 下载到本地。

enter image description here

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.