我有一个 python 中的 azure 函数应用程序,带有 blob 触发器。当数据库文件上传到我的存储帐户中的以下路径时,它将触发一个函数:
abc360/sqlite_db_file/{name}.db
该函数将从 db 文件读取数据并将数据写入 csv 并将其上传回存储帐户容器中的文件夹,路径为:
abc360/sqlite_csv_files
函数,从 db 文件读取数据,将其写入 csv 并使用存储 blob python 库分块上传 csv。因此,我能够将函数的运行时间从 6.5 分钟减少到 30 秒。
我使用 Visual Studio Code 和 azure function core 工具在本地开发函数。使用构建和发布管道将其部署到 azure。代码和管道请参考以下内容:
import logging
import sqlite3
import os
import csv
import uuid
import tempfile
from azure.functions import InputStream
from azure.storage.blob import BlobServiceClient, BlobBlock
class DataMigrator:
CONTAINER_NAME = "abc360/sqlite_csv_file"
CHUNK_SIZE = 1024 * 1024 * 4 # 4MB chunk size
def __init__(self, file_path, connection_string):
self.file_path = file_path
self.connection_string = connection_string
def connect_sqlite(self):
return sqlite3.connect(self.file_path)
def get_table_names(self, cursor_sqlite):
logging.info(cursor_sqlite)
cursor_sqlite.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 100;")
return cursor_sqlite.fetchall()
def extract_info_from_filename(self, filename):
parts = filename.split('_')
project_code = parts[0]
revision = parts[1]
datestamp = parts[2].split('.')[0]
return project_code, revision, datestamp
def upload_file_chunks(self, blob_file_path, local_file_path, container_client):
logging.info(blob_file_path)
try:
blob_client = container_client.get_blob_client(blob_file_path)
# upload data
block_list = []
chunk_size = 1024 * 1024 * 4 # 4MB chunk size
with open(local_file_path, 'rb') as f:
offset = 0
while True:
read_data = f.read(chunk_size)
if not read_data:
break # done
blk_id = str(uuid.uuid4())
blob_client.stage_block(block_id=blk_id, data=read_data, length=len(read_data), offset=offset)
block_list.append(BlobBlock(block_id=blk_id))
offset += len(read_data)
blob_client.commit_block_list(block_list)
except Exception as err:
print('Upload file error')
print(err)
def upload_to_storage_account(self, csv_filename):
container_name = "abc360/sqlite_csv_file"
blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(csv_filename)
if blob_client.exists():
blob_client.delete_blob()
#self.upload_file_chunks(csv_filename, csv_filename, container_client)
with open(csv_filename, "rb") as data:
blob_client.upload_blob(data)
return blob_client.url
def write_to_csv(self, cursor_sqlite, table_name, filename):
logging.info(f"file name: {filename}")
project_code, revision, datestamp = self.extract_info_from_filename(filename)
cursor_sqlite.execute(f"PRAGMA table_info({table_name});")
columns_info = cursor_sqlite.fetchall()
columns = [column_info[1] for column_info in columns_info]
csv_filename = f"{table_name}_{project_code}_{revision}_{datestamp}.csv"
with open(csv_filename, 'w', newline='',encoding="utf-8") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(['ProjectCode', 'Revision', 'DateStamp'] + columns)
cursor_sqlite.execute(f"SELECT * FROM {table_name};")
rows = cursor_sqlite.fetchall()
for row in rows:
csv_writer.writerow([project_code, revision, datestamp] + list(row))
return csv_filename
def main(myblob: InputStream):
conn_sqlite = None # Initialize connection variable
try:
blob_name = os.path.basename(myblob.name)
logging.info(f"Processing blob: {blob_name}")
temp_file_path = os.path.join(tempfile.gettempdir(), blob_name)
logging.info(temp_file_path)
with open(temp_file_path, "wb") as temp_file:
temp_file.write(myblob.read())
connection_string = os.environ.get('AzureWebJobsStorage')
if not connection_string:
raise ValueError("Storage connection string is not provided.")
migrator = DataMigrator(temp_file_path, connection_string)
conn_sqlite = migrator.connect_sqlite()
logging.info("Connected to SQLite database successfully")
cursor_sqlite = conn_sqlite.cursor()
tables = migrator.get_table_names(cursor_sqlite)
logging.info(f"Tables in SQLite file: {tables}")
for table in tables:
table_name = table[0]
csv_filename = migrator.write_to_csv(cursor_sqlite, table_name, blob_name)
if csv_filename:
csv_url = migrator.upload_to_storage_account(csv_filename)
if csv_url:
logging.info(f"CSV file uploaded: {csv_url}")
except Exception as e:
logging.error(f"Error: {str(e)}")
finally:
# Close SQLite connection
if conn_sqlite:
conn_sqlite.close()
# Set up logging configuration
logging.basicConfig(level=logging.INFO)
function.json:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "myblob",
"type": "blobTrigger",
"direction": "in",
"path": "abc360/sqlite_db_file/{name}.db",
"connection": "abc360stg_STORAGE"
}
]
}
存储帐户名称是 abc360stg,应用程序设置如下所示:
Azure git 项目目录结构:
建设管道:
对于构建管道,我使用 python 函数应用程序模板:
发布管道:
问题:
我很确定我在某个地方搞砸了。如果我能得到一些帮助来解决这个问题,我将非常感激。
我使用下面的代码并通过 Azure Devops 管道部署它,blob 触发器成功工作并将 db 文件写入 csv 文件:-*
import os
import tempfile
import sqlite3
import csv
import logging
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.functions import InputStream
def main(myblob: InputStream):
blob_name = os.path.basename(myblob.name)
if blob_name.endswith('.db'):
temp_dir = tempfile.mkdtemp()
db_file_path = os.path.join(temp_dir, blob_name)
with open(db_file_path, "wb") as file:
file.write(myblob.read())
conn = sqlite3.connect(db_file_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
csv_file_path = os.path.join(temp_dir, "data.csv")
with open(csv_file_path, "w", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow([i[0] for i in cursor.description])
csv_writer.writerows(rows)
connection_string = "DefaultEndpointsProtocol=https;AccountName=valleystrg549;AccountKey=JxxxxxxxxxB2giy1t3+ASt5+17dA==;EndpointSuffix=core.windows.net"
container_name = "abc"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_blob_client(container=container_name, blob="abc360/sqlite_csv_files/" + blob_name.replace('.db', '.csv'))
with open(csv_file_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
conn.close()
os.remove(db_file_path)
os.remove(csv_file_path)
我的 Azure Devops yaml 管道:-
trigger:
- master
variables:
azureSubscription: 'aedxxxxxxxxxxxxf076e'
functionAppName: 'siliconfunc65'
vmImageName: 'ubuntu-latest'
workingDirectory: '$(System.DefaultWorkingDirectory)/'
stages:
- stage: Build
displayName: Build stage
jobs:
- job: Build
displayName: Build
pool:
vmImage: $(vmImageName)
steps:
- bash: |
if [ -f extensions.csproj ]
then
dotnet build extensions.csproj --runtime ubuntu.16.04-x64 --output ./bin
fi
workingDirectory: $(workingDirectory)
displayName: 'Build extensions'
- task: UsePythonVersion@0
displayName: 'Use Python 3.11'
inputs:
versionSpec: 3.11
- bash: |
pip install --target="./.python_packages/lib/site-packages" -r ./requirements.txt
workingDirectory: $(workingDirectory)
displayName: 'Install application dependencies'
- task: ArchiveFiles@2
displayName: 'Archive files'
inputs:
rootFolderOrFile: '$(workingDirectory)'
includeRootFolder: false
archiveType: zip
archiveFile: $(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip
replaceExistingArchive: true
- publish: $(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip
artifact: drop
- stage: Deploy
displayName: Deploy stage
dependsOn: Build
condition: succeeded()
jobs:
- deployment: Deploy
displayName: Deploy
environment: 'development'
pool:
vmImage: $(vmImageName)
strategy:
runOnce:
deploy:
steps:
- task: AzureFunctionApp@1
displayName: 'Azure functions app deploy'
inputs:
azureSubscription: '$(azureSubscription)'
appType: functionAppLinux
appName: $(functionAppName)
package: '$(Pipeline.Workspace)/drop/$(Build.BuildId).zip'
Azure Portal Blob 触发器调用:-
Body- abc/sqlite_db_file/{name}.db
输出:-