我目前正在使用 Camelot 从 PDF 文件中检索表格。在许多情况下,一个表将跨越多个页面或有一个空白行。这两种情况导致 Camelot 生成两个或更多表(取决于有多少空白行或表跨越多少页)。我想知道在 Camelot 生成多个表后是否有一种方法可以将这些单独的表追加回一个表中?有没有办法告诉 Camelot 忽略分页符和/或空白行?
这是我为此编写的代码。它很长,可以从 CLI 调用,并且可以执行您想要的操作,只要表格有边框。我只使用有边框的表格,所以我只在“格子”模式下使用 Camelot。我为未来的自己添加了很多评论:) 如果将代码保存到名为“pdf-extract-tables.py”的文件中,则可以像这样调用它
python /path/to/pdf-extract-tables.py -p /path/to/file.pdf -t 15
将其保存到名为
pdf-extract-tables.py
:
的文件中
import argparse
import warnings
import camelot
import csv
import os
def get_continued_tables(tables, threshold):
continued_tables = {}
previous_table = False
group_counter = 0
# typical height of a pdf is 842 points and bottom margins are anywhere between 56 and 85 points
# therefore, accounting for margins, 792
page_height = 792
# iterate over the tables
for i, table in enumerate(tables):
# if a previous table exists (remember, we start with this as false)
# and the previous table was on the previous page
# and the number of columns of both tables is the same
if previous_table and table.page == previous_table.page + 1 and len(table.cols) == len(previous_table.cols):
# get the bottom coordinate of the previous table
# note that for pdfs the origin (0, 0) typically starts from the bottom-left corner of the page,
# with the y-coordinate increasing as you move upwards
# this is why for {x0, y0, x1, y1} we need the y0 as the bottom
previous_table_bottom = previous_table._bbox[1]
# get the top coordinate of the current table
# for {x0, y0, x1, y1} we need the y1 as the top
current_table_top = table._bbox[3]
# if the previous table ends in the last 15% of the page and the current table starts in the first 15% of the page
if previous_table_bottom < (threshold / 100) * page_height and current_table_top > (1 - threshold / 100) * page_height:
# if we don't have started this group of tables
if (continued_tables.get(group_counter) is None):
# start by adding the first table
continued_tables[group_counter] = [previous_table]
# add any of the sunsequent tables to the group
continued_tables[group_counter].append(table)
# if this is not a continuation of the previous table
else:
# increment the group number
group_counter += 1;
# if this is not a continuation of the previous table
else:
# increment the group number
group_counter += 1;
# the current table becomes the previous table for the next iteration
previous_table = table
# transform the dictionary into an array of arrays
continued_tables = [value for value in continued_tables.values()]
# return the combined tables
return continued_tables
def main():
class NewlineFormatter(argparse.RawDescriptionHelpFormatter):
def _split_lines(self, text, width):
return text.splitlines()
# create argument parser
parser = argparse.ArgumentParser(description = 'Returns an array of tables that should be grouped, also as an array.\nThe tables that should be grouped represent tables that span over multiple pages.', formatter_class = NewlineFormatter)
parser.add_argument('--path', '-p', type = str, metavar = '', required = True, help = 'path to the PDF file containing tables')
parser.add_argument('--threshold', '-t', type = int, metavar = '', default = 15, help = 'if the table on previous page ends in the last x%% of the page and\nthe table on the next page starts in the first x%% of the page,\ntables will be considered as spanning over those pages.\nDefault is 15')
# parse command-line arguments
args = parser.parse_args()
# suppress warning about no tables found
warnings.filterwarnings('ignore', message = 'No tables found on page-*')
# extract tables
tables = camelot.read_pdf(args.path, flavor = 'lattice', pages = 'all')
# get continued tables
continued_tables = get_continued_tables(tables, args.threshold);
# get the name of the PDF file we are processing (without the extension)
pdf_file_name = os.path.basename(args.path)
# the path where we're writing the file to
# (same place where we processed the file from)
file_path = os.path.dirname(args.path)
written = []
# iterate over found tables
for i, table in enumerate(tables):
# if table was already written as part of a group
if table in written: continue
# check if the current table is a continued table
is_continued = any(table in sublist for sublist in continued_tables)
# define the filename for the CSV file (the PDF file's name suffixed with the page where the table was found and the table's index on that page)
# (the naming is the same as what camelot does when called from the CLI)
file_name = f"{pdf_file_name}-page-{table.parsing_report['page']}-table-{table.parsing_report['order']}.csv"
# open the CSV file for writing
with open(file_path + '/' + file_name, 'w', newline = '') as csv_file:
# create a CSV writer
csv_writer = csv.writer(csv_file)
# the number of rows to be written at once to the CSV
# (writing rows one by one is less efficient)
batch_size = 1000
batch = []
# iterate over the rows in the table
for row in table.data:
# add row to batch
batch.append(row)
# if we gathered enough rows, or if we're at the last row
if len(batch) >= batch_size or row == table.data[-1]:
# write batch to the CSV file
csv_writer.writerows(batch)
# clear the batch
batch = []
# if the current table is a continued table, write all subsequent continued tables to the same CSV file
if is_continued:
# get the index of the group in "continued_tables" associated with the current table
group_index = next(index for index, sublist in enumerate(continued_tables) if table in sublist)
# iterate over the tables in said group and append their data to the same CSV file
for continued_table in continued_tables[group_index]:
# skip writing the current table as it's already written
if continued_table == table or continued_table in written: continue
# write the data of the continued table to the same CSV file
with open(file_path + '/' + file_name, 'a', newline = '') as csv_file:
# create a CSV writer
csv_writer = csv.writer(csv_file)
# the number of rows to be written at once to the CSV
# (writing rows one by one is less efficient)
batch_size = 1000
batch = []
# iterate over the rows in the table
for row in continued_table.data:
# add row to batch
batch.append(row)
# if we gathered enough rows, or if we're at the last row
if len(batch) >= batch_size or row == table.data[-1]:
# write batch to the CSV file
csv_writer.writerows(batch)
# clear the batch
batch = []
# keep track of written tables so that are not written again in the main iteration
written.append(continued_table)
if __name__ == "__main__":
main()