优化Databricks中Excel文件的读取和格式化功能

问题描述 投票:0回答:1

我编写了一个小函数来从 CSV 文件读取数据并将输出存储在格式化的 Excel 工作簿中。该代码将在 Spark 群集上运行的 Azure Databricks 笔记本中运行。我应该如何优化这个功能?


!pip install xlsxwriter --quiet --disable-pip-version-check

import pandas as pd

import numpy as np

import xlsxwriter

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, round, to_date

from pyspark.sql.types import DecimalType, DateType, TimestampType

import shutil



def check_lists_length(*lists): 

  return not(all(len(lst) == len(lists[0]) for lst in lists)) 



def client_file_creator_from_csv(sheet_names, table_names, file_names, csv_list, output_path): 

  """

  Reads a list of CSV files and returns formatted Excel Workbooks in Azure Databricks

  

  Parameters:

  sheet_names (list): A list of strings representing worksheet names of the final workbooks

  table_names (list): A list of strings representing table headers of the final workbooks

  file_names (list): A list of strings representing file names for the final workbooks

  csv_list (list): A list of strings representing the input paths of the CSV files

  output_path (string) : A string representing the output path for the Excel workbooks

  """

  if check_lists_length(sheet_names, table_names, file_names, csv_list):

    raise Exception("The number of CSV files and file names and sheet names and table names must be the same")



  #Convert CSV file paths to dbfs format for use with dbutils and pyspark

  csv_list = ["dbfs:" + str.split(x,"dbfs")[1] for x in csv_list]

  #Convert output file paths to dbfs format for use with dbutils and pyspark

  path_api = "dbfs:" + str.split(output_path,"dbfs")[1]

  #Create output path in case it doesn't exist

  dbutils.fs.mkdirs(path_api)

  #Create temporary path in dbfs to store ExcelWriter objects

  temp_path = "/tmp/" + str.split(dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get(), sep = '@')[0]

  #Create lists to store pandas dataframes and ExcelWriter objects

  dfs = list()

  writers = list()

  for i,csv in enumerate(csv_list):

    df = spark.read.option("header", True).option("nullValue", "null").option("delimiter", ",").option("quote", "\"").option("escape", "\"").option("multiLine", True).option("inferSchema", True).option("enforceSchema", True).option("mode", "DROPMALFORMED").csv(csv)

    if df.count() == 0:

      raise Exception('File does not have any rows') 

    dcml_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DecimalType)]

    time_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, TimestampType)]

    for col_name in dcml_cols:

      df = df.withColumn(col_name, round(col(col_name).cast('double'),2))

    for col_name in time_cols:

      df = df.withColumn(col_name, to_date(col(col_name)))

    dfs.append(df.select("*").toPandas())

    writers.append(pd.ExcelWriter(f"{temp_path}_{i}.xlsx", engine="xlsxwriter"))



  for df, writer, sheet_name, table_name, file_name in zip(dfs,writers, sheet_names, table_names, file_names):

    #Since Excel has a row limit of 1,048,576 rows per worksheet, larger dataframes must be split across multiple worksheets

    split_df = np.array_split(df,(df.shape[0]//1000000)+1)

    workbook = writer.book

    #Add standard formatting as required

    top_border = workbook.add_format({'top': 2})

    bottom_border = workbook.add_format({'bottom': 2})

    left_border = workbook.add_format({'left': 2})

    right_border = workbook.add_format({'right': 2})

    top_left_border = workbook.add_format({'top': 2, 'left': 2})

    top_right_border = workbook.add_format({'top': 2, 'right': 2})

    bottom_left_border = workbook.add_format({'bottom': 2, 'left': 2})

    bottom_right_border = workbook.add_format({'bottom': 2, 'right': 2})

    table_title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': '#305496', 'align': 'left', 'valign': 'vcenter'})

    title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': 'black', 'align': 'left', 'valign': 'vcenter'})

    for j, df_part in enumerate(split_df):

      df_part.to_excel(writer, sheet_name = sheet_name + (("_"+ str(j)) if j > 0 else ""), index = False, header = False, startrow = 4, startcol = 1)

      worksheet = writer.sheets[sheet_name + (("_"+ str(j)) if j > 0 else "")]

      for col_num, value in enumerate(df_part.columns.values):

        worksheet.write(3,col_num+1,value,table_title_format)

      num_rows = df_part.shape[0]

      num_cols = df_part.shape[1]

      worksheet.conditional_format(3,2,3,num_cols-1,{"type": "no_errors", "format": top_border})

      worksheet.conditional_format(4,1,num_rows+2,1,{"type": "no_errors", "format": left_border})

      worksheet.conditional_format(4,num_cols,num_rows+2,num_cols,{"type": "no_errors", "format": right_border})

      worksheet.conditional_format(num_rows+3,2,num_rows+3,num_cols-1,{"type": "no_errors", "format": bottom_border})

      worksheet.conditional_format(3,1,3,1,{"type": "no_errors", "format": top_left_border})

      worksheet.conditional_format(3,num_cols,3,num_cols,{"type": "no_errors", "format": top_right_border})

      worksheet.conditional_format(num_rows+3,1,num_rows+3,1,{"type": "no_errors", "format": bottom_left_border})

      worksheet.conditional_format(num_rows+3,num_cols,num_rows+3,num_cols,{"type": "no_errors", "format": bottom_right_border})

      worksheet.hide_gridlines(2)

      worksheet.autofilter(3,1,num_rows+3,num_cols)

      worksheet.autofit()

      worksheet.set_column_pixels(0, 0, 20)

      worksheet.merge_range(1,1,1,num_cols, table_name + (("_"+ str(j)) if j > 0 else ""), title_format)

    workbook.close()

    shutil.copy(writer,output_path + file_name + ".xlsx")

    print(f"File {file_name}.xlsx created successfully")

我考虑了几个优化机会,但效果不佳:

  1. 使用 pyspark 而不是 pandas 执行所有数据处理: 由于需要格式化 Excel,因此不可行 使用 xlsxwriter 的工作簿
  2. 使用 xlsxwriter 的替代库:我无法模仿确切的 我在这个函数中拥有的功能
  3. 并行处理数据帧并避免将它们存储在 列表:让我的代码更难理解,我不知道如何理解 正确利用 Spark 集群

如果有人对如何改进此功能有想法或建议,我非常欢迎。目前,驱动程序节点过载,并且由于

pandas
xlsxwriter
,RAM 使用量出现峰值。

python pandas pyspark azure-databricks xlsxwriter
1个回答
0
投票

openpyxl 和 xlsxwriter 都用于 pandas 数据帧。您可以使用spark插件。但spark插件不带格式化功能。对于格式设置,您可以查看使用带有格式设置的 Excel 模板。然后复制此模板并使用数据更新副本。 您可以像这样在集群级别安装库

您的代码将如下所示。

path = "abfss://<container_name>@<storage_name>.dfs.core.windows.net/testdir/test.xlsx"  

spark_Df1.write.format("com.crealytics.spark.excel")\
  .option("header", "true")\
  .option("dataAddress", "'My Sheet1'!A1")\
  .mode("append")\
  .save(path)

xlsxwriter 的问题是它只能写入而不能读取/更新。 所以另一种选择是使用spark插件写入数据,然后使用openpyxl读取和更新格式。关于如何执行此操作有很多选项,例如 THISTHIS

希望这些选项之一能够提高性能。

© www.soinside.com 2019 - 2024. All rights reserved.