我编写了一个小函数来从 CSV 文件读取数据并将输出存储在格式化的 Excel 工作簿中。该代码将在 Spark 群集上运行的 Azure Databricks 笔记本中运行。我应该如何优化这个功能?
!pip install xlsxwriter --quiet --disable-pip-version-check
import pandas as pd
import numpy as np
import xlsxwriter
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, to_date
from pyspark.sql.types import DecimalType, DateType, TimestampType
import shutil
def check_lists_length(*lists):
return not(all(len(lst) == len(lists[0]) for lst in lists))
def client_file_creator_from_csv(sheet_names, table_names, file_names, csv_list, output_path):
"""
Reads a list of CSV files and returns formatted Excel Workbooks in Azure Databricks
Parameters:
sheet_names (list): A list of strings representing worksheet names of the final workbooks
table_names (list): A list of strings representing table headers of the final workbooks
file_names (list): A list of strings representing file names for the final workbooks
csv_list (list): A list of strings representing the input paths of the CSV files
output_path (string) : A string representing the output path for the Excel workbooks
"""
if check_lists_length(sheet_names, table_names, file_names, csv_list):
raise Exception("The number of CSV files and file names and sheet names and table names must be the same")
#Convert CSV file paths to dbfs format for use with dbutils and pyspark
csv_list = ["dbfs:" + str.split(x,"dbfs")[1] for x in csv_list]
#Convert output file paths to dbfs format for use with dbutils and pyspark
path_api = "dbfs:" + str.split(output_path,"dbfs")[1]
#Create output path in case it doesn't exist
dbutils.fs.mkdirs(path_api)
#Create temporary path in dbfs to store ExcelWriter objects
temp_path = "/tmp/" + str.split(dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get(), sep = '@')[0]
#Create lists to store pandas dataframes and ExcelWriter objects
dfs = list()
writers = list()
for i,csv in enumerate(csv_list):
df = spark.read.option("header", True).option("nullValue", "null").option("delimiter", ",").option("quote", "\"").option("escape", "\"").option("multiLine", True).option("inferSchema", True).option("enforceSchema", True).option("mode", "DROPMALFORMED").csv(csv)
if df.count() == 0:
raise Exception('File does not have any rows')
dcml_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DecimalType)]
time_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, TimestampType)]
for col_name in dcml_cols:
df = df.withColumn(col_name, round(col(col_name).cast('double'),2))
for col_name in time_cols:
df = df.withColumn(col_name, to_date(col(col_name)))
dfs.append(df.select("*").toPandas())
writers.append(pd.ExcelWriter(f"{temp_path}_{i}.xlsx", engine="xlsxwriter"))
for df, writer, sheet_name, table_name, file_name in zip(dfs,writers, sheet_names, table_names, file_names):
#Since Excel has a row limit of 1,048,576 rows per worksheet, larger dataframes must be split across multiple worksheets
split_df = np.array_split(df,(df.shape[0]//1000000)+1)
workbook = writer.book
#Add standard formatting as required
top_border = workbook.add_format({'top': 2})
bottom_border = workbook.add_format({'bottom': 2})
left_border = workbook.add_format({'left': 2})
right_border = workbook.add_format({'right': 2})
top_left_border = workbook.add_format({'top': 2, 'left': 2})
top_right_border = workbook.add_format({'top': 2, 'right': 2})
bottom_left_border = workbook.add_format({'bottom': 2, 'left': 2})
bottom_right_border = workbook.add_format({'bottom': 2, 'right': 2})
table_title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': '#305496', 'align': 'left', 'valign': 'vcenter'})
title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': 'black', 'align': 'left', 'valign': 'vcenter'})
for j, df_part in enumerate(split_df):
df_part.to_excel(writer, sheet_name = sheet_name + (("_"+ str(j)) if j > 0 else ""), index = False, header = False, startrow = 4, startcol = 1)
worksheet = writer.sheets[sheet_name + (("_"+ str(j)) if j > 0 else "")]
for col_num, value in enumerate(df_part.columns.values):
worksheet.write(3,col_num+1,value,table_title_format)
num_rows = df_part.shape[0]
num_cols = df_part.shape[1]
worksheet.conditional_format(3,2,3,num_cols-1,{"type": "no_errors", "format": top_border})
worksheet.conditional_format(4,1,num_rows+2,1,{"type": "no_errors", "format": left_border})
worksheet.conditional_format(4,num_cols,num_rows+2,num_cols,{"type": "no_errors", "format": right_border})
worksheet.conditional_format(num_rows+3,2,num_rows+3,num_cols-1,{"type": "no_errors", "format": bottom_border})
worksheet.conditional_format(3,1,3,1,{"type": "no_errors", "format": top_left_border})
worksheet.conditional_format(3,num_cols,3,num_cols,{"type": "no_errors", "format": top_right_border})
worksheet.conditional_format(num_rows+3,1,num_rows+3,1,{"type": "no_errors", "format": bottom_left_border})
worksheet.conditional_format(num_rows+3,num_cols,num_rows+3,num_cols,{"type": "no_errors", "format": bottom_right_border})
worksheet.hide_gridlines(2)
worksheet.autofilter(3,1,num_rows+3,num_cols)
worksheet.autofit()
worksheet.set_column_pixels(0, 0, 20)
worksheet.merge_range(1,1,1,num_cols, table_name + (("_"+ str(j)) if j > 0 else ""), title_format)
workbook.close()
shutil.copy(writer,output_path + file_name + ".xlsx")
print(f"File {file_name}.xlsx created successfully")
我考虑了几个优化机会,但效果不佳:
如果有人对如何改进此功能有想法或建议,我非常欢迎。目前,驱动程序节点过载,并且由于
pandas
和 xlsxwriter
,RAM 使用量出现峰值。
openpyxl 和 xlsxwriter 都用于 pandas 数据帧。您可以使用spark插件。但spark插件不带格式化功能。对于格式设置,您可以查看使用带有格式设置的 Excel 模板。然后复制此模板并使用数据更新副本。 您可以像这样在集群级别安装库
您的代码将如下所示。
path = "abfss://<container_name>@<storage_name>.dfs.core.windows.net/testdir/test.xlsx"
spark_Df1.write.format("com.crealytics.spark.excel")\
.option("header", "true")\
.option("dataAddress", "'My Sheet1'!A1")\
.mode("append")\
.save(path)
xlsxwriter 的问题是它只能写入而不能读取/更新。 所以另一种选择是使用spark插件写入数据,然后使用openpyxl读取和更新格式。关于如何执行此操作有很多选项,例如 THIS 和 THIS。
希望这些选项之一能够提高性能。