Csv 文件使用 python 函数应用程序上传到本地驱动器而不是存储帐户

问题描述 投票:0回答:1

我有一个 python 中的 azure 函数应用程序,带有 blob 触发器。当数据库文件上传到我的存储帐户中的以下路径时,它将触发一个函数:

abc360/sqlite_db_file/{name}.db

该函数将从 db 文件读取数据并将数据写入 csv 并将其上传回存储帐户容器中的文件夹,路径为:

abc360/sqlite_csv_files

函数,从 db 文件读取数据,将其写入 csv 并使用存储 blob python 库分块上传 csv。因此,我能够将函数的运行时间从 6.5 分钟减少到 30 秒。

我使用 Visual Studio Code 和 azure function core 工具在本地开发函数。使用构建和发布管道将其部署到 azure。代码和管道请参考以下内容:

import logging
import sqlite3
import os
import csv
import uuid
import tempfile
from azure.functions import InputStream
from azure.storage.blob import BlobServiceClient, BlobBlock

class DataMigrator:
    CONTAINER_NAME = "abc360/sqlite_csv_file"
    CHUNK_SIZE = 1024 * 1024 * 4  # 4MB chunk size

    def __init__(self, file_path, connection_string):
        self.file_path = file_path
        self.connection_string = connection_string

    def connect_sqlite(self):
        return sqlite3.connect(self.file_path)

    def get_table_names(self, cursor_sqlite):
        logging.info(cursor_sqlite)
        cursor_sqlite.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 100;")
        return cursor_sqlite.fetchall()

    def extract_info_from_filename(self, filename):
        parts = filename.split('_')
        project_code = parts[0]
        revision = parts[1]
        datestamp = parts[2].split('.')[0]
        return project_code, revision, datestamp

    def upload_file_chunks(self, blob_file_path, local_file_path, container_client):
        logging.info(blob_file_path)
        try:
            blob_client = container_client.get_blob_client(blob_file_path)
            # upload data
            block_list = []
            chunk_size = 1024 * 1024 * 4  # 4MB chunk size
            with open(local_file_path, 'rb') as f:
                offset = 0
                while True:
                    read_data = f.read(chunk_size)
                    if not read_data:
                        break  # done
                    blk_id = str(uuid.uuid4())
                    blob_client.stage_block(block_id=blk_id, data=read_data, length=len(read_data), offset=offset)
                    block_list.append(BlobBlock(block_id=blk_id))
                    offset += len(read_data)
            blob_client.commit_block_list(block_list)
        except Exception as err:
            print('Upload file error')
            print(err)

    def upload_to_storage_account(self, csv_filename):
        container_name = "abc360/sqlite_csv_file"

        blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)
        container_client = blob_service_client.get_container_client(container_name)

        blob_client = container_client.get_blob_client(csv_filename)
        if blob_client.exists():
            blob_client.delete_blob()

        #self.upload_file_chunks(csv_filename, csv_filename, container_client)
        with open(csv_filename, "rb") as data:
            blob_client.upload_blob(data)

        return blob_client.url

    def write_to_csv(self, cursor_sqlite, table_name, filename):
        logging.info(f"file name: {filename}")
        project_code, revision, datestamp = self.extract_info_from_filename(filename)

        cursor_sqlite.execute(f"PRAGMA table_info({table_name});")
        columns_info = cursor_sqlite.fetchall()
        columns = [column_info[1] for column_info in columns_info]

        csv_filename = f"{table_name}_{project_code}_{revision}_{datestamp}.csv"
        with open(csv_filename, 'w', newline='',encoding="utf-8") as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(['ProjectCode', 'Revision', 'DateStamp'] + columns)
        
            cursor_sqlite.execute(f"SELECT * FROM {table_name};")
            rows = cursor_sqlite.fetchall()
            for row in rows:
                csv_writer.writerow([project_code, revision, datestamp] + list(row))

        return csv_filename


def main(myblob: InputStream):
    conn_sqlite = None  # Initialize connection variable
    try:
        blob_name = os.path.basename(myblob.name)
        logging.info(f"Processing blob: {blob_name}")

        temp_file_path = os.path.join(tempfile.gettempdir(), blob_name)
        logging.info(temp_file_path)
    with open(temp_file_path, "wb") as temp_file:
        temp_file.write(myblob.read())

    connection_string = os.environ.get('AzureWebJobsStorage')
    if not connection_string:
        raise ValueError("Storage connection string is not provided.")

    migrator = DataMigrator(temp_file_path, connection_string)
    conn_sqlite = migrator.connect_sqlite()
    logging.info("Connected to SQLite database successfully")
    cursor_sqlite = conn_sqlite.cursor()

    tables = migrator.get_table_names(cursor_sqlite)
    logging.info(f"Tables in SQLite file: {tables}")

    for table in tables:
        table_name = table[0]
        csv_filename = migrator.write_to_csv(cursor_sqlite, table_name, blob_name)
        if csv_filename:
            csv_url = migrator.upload_to_storage_account(csv_filename)
            if csv_url:
                logging.info(f"CSV file uploaded: {csv_url}")
    except Exception as e:
        logging.error(f"Error: {str(e)}")
    finally:
        # Close SQLite connection
        if conn_sqlite:
            conn_sqlite.close()

# Set up logging configuration
logging.basicConfig(level=logging.INFO)

function.json:

{
  "scriptFile": "__init__.py",
  "bindings": [
   {
      "name": "myblob",
      "type": "blobTrigger",
      "direction": "in",
      "path": "abc360/sqlite_db_file/{name}.db",
      "connection": "abc360stg_STORAGE"
   }
  ]
}

存储帐户名称是 abc360stg,应用程序设置如下所示:

Azure git 项目目录结构:

建设管道:

对于构建管道,我使用 python 函数应用程序模板:

发布管道:

问题:

  1. Csv 文件正在上传到我的代码在本地驱动器中所在的文件夹:

  1. 此外,当 db 文件上传到 sqlite_db_file 文件夹中时,不会触发函数应用程序。

我很确定我在某个地方搞砸了。如果我能得到一些帮助来解决这个问题,我将非常感激。

python azure-functions azure-blob-storage azure-python-sdk
1个回答
0
投票

我使用下面的代码并通过 Azure Devops 管道部署它,blob 触发器成功工作并将 db 文件写入 csv 文件:-*

import os
import tempfile
import sqlite3
import csv
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.functions import InputStream

def main(myblob: InputStream):
    blob_name = os.path.basename(myblob.name)
    if blob_name.endswith('.db'):
        temp_dir = tempfile.mkdtemp()
        db_file_path = os.path.join(temp_dir, blob_name)
        
        with open(db_file_path, "wb") as file:
            file.write(myblob.read())  
        conn = sqlite3.connect(db_file_path)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users")  
        rows = cursor.fetchall()
        
        csv_file_path = os.path.join(temp_dir, "data.csv")
        
        
        with open(csv_file_path, "w", newline="") as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerow([i[0] for i in cursor.description])  
            csv_writer.writerows(rows)
        
        
        connection_string = "DefaultEndpointsProtocol=https;AccountName=valleystrg549;AccountKey=JxxxxxxxxxB2giy1t3+ASt5+17dA==;EndpointSuffix=core.windows.net"
        container_name = "abc"
        blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        blob_client = blob_service_client.get_blob_client(container=container_name, blob="abc360/sqlite_csv_files/" + blob_name.replace('.db', '.csv'))
        
        with open(csv_file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True)  
        conn.close()
        os.remove(db_file_path)
        os.remove(csv_file_path)

我的 Azure Devops yaml 管道:-

trigger:
- master

variables:
  azureSubscription: 'aedxxxxxxxxxxxxf076e'
  functionAppName: 'siliconfunc65'
  vmImageName: 'ubuntu-latest'
  workingDirectory: '$(System.DefaultWorkingDirectory)/'

stages:
- stage: Build
  displayName: Build stage

  jobs:
  - job: Build
    displayName: Build
    pool:
      vmImage: $(vmImageName)

    steps:
    - bash: |
        if [ -f extensions.csproj ]
        then
            dotnet build extensions.csproj --runtime ubuntu.16.04-x64 --output ./bin
        fi
      workingDirectory: $(workingDirectory)
      displayName: 'Build extensions'

    - task: UsePythonVersion@0
      displayName: 'Use Python 3.11'
      inputs:
        versionSpec: 3.11 
    - bash: |
        pip install --target="./.python_packages/lib/site-packages" -r ./requirements.txt
      workingDirectory: $(workingDirectory)
      displayName: 'Install application dependencies'

    - task: ArchiveFiles@2
      displayName: 'Archive files'
      inputs:
        rootFolderOrFile: '$(workingDirectory)'
        includeRootFolder: false
        archiveType: zip
        archiveFile: $(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip
        replaceExistingArchive: true

    - publish: $(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip
      artifact: drop

- stage: Deploy
  displayName: Deploy stage
  dependsOn: Build
  condition: succeeded()

  jobs:
  - deployment: Deploy
    displayName: Deploy
    environment: 'development'
    pool:
      vmImage: $(vmImageName)

    strategy:
      runOnce:
        deploy:

          steps:
          - task: AzureFunctionApp@1
            displayName: 'Azure functions app deploy'
            inputs:
              azureSubscription: '$(azureSubscription)'
              appType: functionAppLinux
              appName: $(functionAppName)
              package: '$(Pipeline.Workspace)/drop/$(Build.BuildId).zip'

Azure Portal Blob 触发器调用:-

enter image description here

Body- abc/sqlite_db_file/{name}.db

enter image description here

输出:-

enter image description here

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.