使用azure数据工厂将dbf转换为csv

问题描述 投票:0回答:1

嗨,任何人都可以帮助我使用 azure 数据工厂自定义活动,使用 azure 批处理帐户将 dbf 文件(放置在存储帐户中)转换为 csv。请提供python代码

我创建了一个批处理帐户,并尝试使用下面的代码使用 azure 数据工厂自定义活动执行放置在存储帐户中的 python 文件。显示执行成功,但在存储帐户下找不到 csv 文件。

import csv
from dbfread import DBF, FieldParser
from azure.storage.blob import BlobServiceClient

# Replace these with your actual Azure Storage account information
account_name = "my_account_name"
account_key = "my_account_key"
container_name = "dbf"
blob_name = "test.dbf"

# Connection string for Azure Blob Storage
connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Download the DBF file from Azure Blob Storage
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
downloaded_file = "downloaded_file.dbf"

try:
    with open(downloaded_file, "wb") as download_file:
        download_file.write(blob_client.download_blob().readall())
except Exception as e:
    print(f"Error downloading DBF file: {e}")
    exit()

# Convert DBF to CSV
class CustomFieldParser(FieldParser):
    def _parse_memo_index(self, data):
        try:
            return int(data)
        except ValueError:
            return data.decode('utf-8', errors='replace')

try:
    dbf_file = DBF(downloaded_file, parserclass=CustomFieldParser)
except Exception as e:
    print(f"Error reading DBF file: {e}")
    exit()

csv_file = "converted_file.csv"

try:
    with open(csv_file, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.writer(csvfile)

        # Write header
        writer.writerow(dbf_file.field_names)

        # Write rows
        for record in dbf_file:
            writer.writerow(list(record.values()))

    print(f"Conversion completed. CSV file saved as {csv_file}")

except Exception as e:
    print(f"Error writing CSV file: {e}")
azure-data-factory dbf
1个回答
0
投票

您提供的代码将 DBF 文件转换为本地路径中的 CSV 文件。您没有提到需要存储 CSV 文件的路径,因此 CSV 文件存储在下载 DBF 文件的路径中。这可能是管道成功但 CSV 文件不在存储帐户中的原因。使用以下代码提及 CSV 文件的 Blob 存储帐户路径:

# Convert to CSV
csv_data = []
csv_data.append(dbf_file.field_names)  # Add header

for record in dbf_file:
    csv_data.append(list(record.values()))

# Convert CSV data to string
csv_string = '\n'.join([','.join(map(str, row)) for row in csv_data])

# Upload CSV data to blob
try:
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=cblob_name)
    blob_client.upload_blob(csv_string, overwrite=True)

    print(f"Conversion completed. CSV file saved as {cblob_name} in container {container_name}")

这将成功将 DBF 文件转换为 CSV 文件并将其存储在存储帐户中,如下所示:

enter image description here

以下是完整代码供您参考:

import csv
import sys
from dbfread import DBF, FieldParser
from azure.storage.blob import BlobServiceClient

# Replace these with your actual Azure Storage account information
account_name = "bhanustoree"
account_key = "HraF6Fn5pn5DaMN1opUzX7oq2t5b0SSHcaaR2+SS5aabGsWCC5DsBTE7tfIvHGRau4ptpqB9oQTi+ASty3JSYA==;EndpointSuffix=core.windows.net"
container_name = "files"
blob_name = 'marks.dbf'
cblob_name = "converted_file.csv"  # Name of the blob where you want to save the CSV file
# Connection string for Azure Blob Storage
connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Download the DBF file from Azure Blob Storage
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
downloaded_file = "downloaded1_file.dbf"

try:
    with open(downloaded_file, "wb") as download_file:
        download_file.write(blob_client.download_blob().readall())
except Exception as e:
    print(f"Error downloading DBF file: {e}")
    sys.exit()

# Convert DBF to CSV
class CustomFieldParser(FieldParser):
    def _parse_memo_index(self, data):
        try:
            return int(data)
        except ValueError:
            return data.decode('utf-8', errors='replace')

try:
    dbf_file = DBF(downloaded_file, parserclass=CustomFieldParser)
except Exception as e:
    print(f"Error reading DBF file: {e}")
    sys.exit()

# Convert to CSV
csv_data = []
csv_data.append(dbf_file.field_names)  # Add header

for record in dbf_file:
    csv_data.append(list(record.values()))

# Convert CSV data to string
csv_string = '\n'.join([','.join(map(str, row)) for row in csv_data])

# Upload CSV data to blob
try:
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=cblob_name)
    blob_client.upload_blob(csv_string, overwrite=True)

    print(f"Conversion completed. CSV file saved as {cblob_name} in container {container_name}")

except Exception as e:
    print(f"Error uploading CSV file to blob storage: {e}")
© www.soinside.com 2019 - 2024. All rights reserved.