嗨,任何人都可以帮助我使用 azure 数据工厂自定义活动,使用 azure 批处理帐户将 dbf 文件(放置在存储帐户中)转换为 csv。请提供python代码
我创建了一个批处理帐户,并尝试使用下面的代码使用 azure 数据工厂自定义活动执行放置在存储帐户中的 python 文件。显示执行成功,但在存储帐户下找不到 csv 文件。
import csv
from dbfread import DBF, FieldParser
from azure.storage.blob import BlobServiceClient
# Replace these with your actual Azure Storage account information
account_name = "my_account_name"
account_key = "my_account_key"
container_name = "dbf"
blob_name = "test.dbf"
# Connection string for Azure Blob Storage
connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Download the DBF file from Azure Blob Storage
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
downloaded_file = "downloaded_file.dbf"
try:
with open(downloaded_file, "wb") as download_file:
download_file.write(blob_client.download_blob().readall())
except Exception as e:
print(f"Error downloading DBF file: {e}")
exit()
# Convert DBF to CSV
class CustomFieldParser(FieldParser):
def _parse_memo_index(self, data):
try:
return int(data)
except ValueError:
return data.decode('utf-8', errors='replace')
try:
dbf_file = DBF(downloaded_file, parserclass=CustomFieldParser)
except Exception as e:
print(f"Error reading DBF file: {e}")
exit()
csv_file = "converted_file.csv"
try:
with open(csv_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
# Write header
writer.writerow(dbf_file.field_names)
# Write rows
for record in dbf_file:
writer.writerow(list(record.values()))
print(f"Conversion completed. CSV file saved as {csv_file}")
except Exception as e:
print(f"Error writing CSV file: {e}")
您提供的代码将 DBF 文件转换为本地路径中的 CSV 文件。您没有提到需要存储 CSV 文件的路径,因此 CSV 文件存储在下载 DBF 文件的路径中。这可能是管道成功但 CSV 文件不在存储帐户中的原因。使用以下代码提及 CSV 文件的 Blob 存储帐户路径:
# Convert to CSV
csv_data = []
csv_data.append(dbf_file.field_names) # Add header
for record in dbf_file:
csv_data.append(list(record.values()))
# Convert CSV data to string
csv_string = '\n'.join([','.join(map(str, row)) for row in csv_data])
# Upload CSV data to blob
try:
blob_client = blob_service_client.get_blob_client(container=container_name, blob=cblob_name)
blob_client.upload_blob(csv_string, overwrite=True)
print(f"Conversion completed. CSV file saved as {cblob_name} in container {container_name}")
这将成功将 DBF 文件转换为 CSV 文件并将其存储在存储帐户中,如下所示:
以下是完整代码供您参考:
import csv
import sys
from dbfread import DBF, FieldParser
from azure.storage.blob import BlobServiceClient
# Replace these with your actual Azure Storage account information
account_name = "bhanustoree"
account_key = "HraF6Fn5pn5DaMN1opUzX7oq2t5b0SSHcaaR2+SS5aabGsWCC5DsBTE7tfIvHGRau4ptpqB9oQTi+ASty3JSYA==;EndpointSuffix=core.windows.net"
container_name = "files"
blob_name = 'marks.dbf'
cblob_name = "converted_file.csv" # Name of the blob where you want to save the CSV file
# Connection string for Azure Blob Storage
connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Download the DBF file from Azure Blob Storage
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
downloaded_file = "downloaded1_file.dbf"
try:
with open(downloaded_file, "wb") as download_file:
download_file.write(blob_client.download_blob().readall())
except Exception as e:
print(f"Error downloading DBF file: {e}")
sys.exit()
# Convert DBF to CSV
class CustomFieldParser(FieldParser):
def _parse_memo_index(self, data):
try:
return int(data)
except ValueError:
return data.decode('utf-8', errors='replace')
try:
dbf_file = DBF(downloaded_file, parserclass=CustomFieldParser)
except Exception as e:
print(f"Error reading DBF file: {e}")
sys.exit()
# Convert to CSV
csv_data = []
csv_data.append(dbf_file.field_names) # Add header
for record in dbf_file:
csv_data.append(list(record.values()))
# Convert CSV data to string
csv_string = '\n'.join([','.join(map(str, row)) for row in csv_data])
# Upload CSV data to blob
try:
blob_client = blob_service_client.get_blob_client(container=container_name, blob=cblob_name)
blob_client.upload_blob(csv_string, overwrite=True)
print(f"Conversion completed. CSV file saved as {cblob_name} in container {container_name}")
except Exception as e:
print(f"Error uploading CSV file to blob storage: {e}")