使用 ProcessPoolExecutor 异步转换文件

问题描述 投票:0回答:1

我有这段代码,用于将 doc 文件转换为

pdf
,但我觉得它没有完成正确的工作。我希望当我将文件放入目录时,这段代码能够同时处理尽可能多的文件。

import os
import time
import pythoncom
from win32com import client
from docx.shared import Pt
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from concurrent.futures import ThreadPoolExecutor
import shutil

from concurrent.futures import ProcessPoolExecutor
import concurrent.futures


baseAd = r"C:\inetpub\wwwroot\utkuploads"
fileList = []
max_processes = os.cpu_count() or 1
max_threads = os.cpu_count() or 1
def createText(filename, filedetail):
    with open(r"C:\inetpub\wwwroot\utkuploads\{filename}.txt".format(filename=filename), 'w') as f:
        f.write(f'{filedetail}')

def doc2pdf(doc_name, pdf_name, font_size=8):
    pythoncom.CoInitialize()
    word = client.DispatchEx("Word.Application")
    if os.path.exists(pdf_name):
        os.remove(pdf_name)
    worddoc = word.Documents.Open(doc_name, ReadOnly=1)
    worddoc.Content.Font.Size = font_size
    try:
        worddoc.SaveAs(pdf_name, FileFormat=17)
    except Exception as e:
        createText('wordToPdfException', f"{e}")
    worddoc.Close()
    # Quit the Word application
    word.Quit()
    pythoncom.CoUninitialize()
    return pdf_name


class DocFileHandler(FileSystemEventHandler):
    def is_temporary_file(event, filename):
        return filename.startswith("~$")

    def on_created(self, event):
        if not (self.is_temporary_file(os.path.basename(event.src_path)) or event.src_path.endswith(
                '.tmp') or event.is_directory or os.path.basename(event.src_path).startswith("~$")):
            fileList.append(event)

def outer_is_temporary_file(filename):
    return filename.startswith("~$")

def createFolder(baseAd, folderName):
    path = os.path.join(baseAd, folderName)
    isExist = os.path.exists(path)

    try:
        if not isExist:
            os.makedirs(path)
        return path
    except:
        return path

def createFolderAtt(folderName):
    isExist = os.path.exists(folderName)
    try:
        if not isExist:
            os.makedirs(folderName)
        return folderName
    except:
        return folderName

def mainConverter(event):
    # The first [0] is root directory utkuploads the second is the file name with extension.
    currentFileName = os.path.split(event.src_path)
    currentFileNameSplitted = os.path.split(event.src_path)[-1]
    try:
        ## This part needs to work for the files that needs to be converted to PDF.
        ## It catches DOCX files and takes their location by doc_path and creates a fake pdf_path directory

        if event.event_type == 'created' and event.src_path.lower().endswith(
                '.docx') and '@' not in currentFileNameSplitted and not outer_is_temporary_file(
                event.src_path):
            print(f"{currentFileName} STARTED")

            doc_path = event.src_path
            pdf_path = os.path.splitext(doc_path)[0] + '.pdf'

            # print(f'Doc path: {doc_path}, \nPdf path: {pdf_path}')

            # If '_' in doc_path
            if '_' in doc_path:
                print(f'New Template has been detected: {doc_path}')
                return
            # If file is not temporary, not _ (template), not attachment, not TEMPLATE-REPORT
            elif '~$' not in doc_path and '_' not in doc_path and '@' not in doc_path and 'TEMPLATE-REPORT' not in doc_path:

                # print(f"File will be converted here: {doc_path}")
                try:
                    if '-GENERATED-REPORT' in doc_path:
                        # Here pdf convertion happens.
                        doc2pdf(doc_path, pdf_path)
                        # Create subFolder based on PDF file.
                        createFolderPath = os.path.split(pdf_path)[-1].split(".")[0]
                        createFolderPath = createFolderPath.replace('-GENERATED-REPORT', '')

                        try:
                            newFolderPath = createFolder(baseAd, createFolderPath)
                        except Exception as error:
                            createText('createFolderGeneratedReport', f'{error}')

                        # print(f"New folder has been created: {newFolderPath}")
                        pdfFileName = os.path.split(pdf_path)[-1]

                        src_pdf = pdf_path
                        dest_pathPdf = os.path.join(newFolderPath, pdfFileName)
                        shutil.move(src_pdf, dest_pathPdf)
                        # print(f"File has been moved to its destination. src: {src_pdf}, destination: {dest_pathPdf} ")

                        # print('Doc path', doc_path)
                        wordFileName = os.path.split(doc_path)[-1]
                        wordPdf = wordFileName
                        dest_pathWord = os.path.join(newFolderPath, wordPdf)
                        shutil.move(doc_path, dest_pathWord)

                    # print( f"Generated Rapor File has been moved to its destination. src: {doc_path}, destination: {dest_pathWord} ")

                    elif '-IMZALIRAPOR' in doc_path:
                        # Here pdf convertion happens.
                        doc2pdf(doc_path, pdf_path)
                        # Create subFolder based on PDF file.
                        createFolderPath = os.path.split(pdf_path)[-1].split(".")[0]
                        createFolderPath = createFolderPath.replace('-IMZALIRAPOR', '')

                        try:
                            newFolderPath = createFolder(baseAd, createFolderPath)
                        except Exception as error:
                            createText('CreateFolderImzaliRapor', f'{error}')
                        # print(f"New folder has been created: {newFolderPath}")
                        pdfFileName = os.path.split(pdf_path)[-1]

                        src_pdf = pdf_path
                        dest_pathPdf = os.path.join(newFolderPath, pdfFileName)
                        shutil.move(src_pdf, dest_pathPdf)
                        # print(f"File has been moved to its destination. src: {src_pdf}, destination: {dest_pathPdf} ")

                        # print('Doc path', doc_path)
                        wordFileName = os.path.split(doc_path)[-1]
                        wordPdf = wordFileName
                        dest_pathWord = os.path.join(newFolderPath, wordPdf)
                        shutil.move(doc_path, dest_pathWord)

                        # print(f"Imzali Report File has been moved to its destination. src: {doc_path}, destination: {dest_pathWord} ")


                    elif 'GENERATED-REPORT' not in doc_path and '-IMZALIRAPOR' not in doc_path and '@' not in doc_path:
                        # Here pdf convertion happens.
                        doc2pdf(doc_path, pdf_path)
                        # Create subFolder based on PDF file.
                        createFolderPath = os.path.split(pdf_path)[-1].split(".")[0]

                        try:
                            newFolderPath = createFolder(baseAd, createFolderPath)
                        except Exception as error:
                            createText('buAnaModel', f'{error}')

                        # print(f"New folder has been created: {newFolderPath}")
                        pdfFileName = os.path.split(pdf_path)[-1]

                        src_pdf = pdf_path
                        dest_pathPdf = os.path.join(newFolderPath, pdfFileName)
                        shutil.move(src_pdf, dest_pathPdf)
                        # print(f"File has been moved to its destination. src: {src_pdf}, destination: {dest_pathPdf} ")

                        # print('Doc path', doc_path)
                        wordFileName = os.path.split(doc_path)[-1]
                        wordPdf = wordFileName
                        dest_pathWord = os.path.join(newFolderPath, wordPdf)
                        shutil.move(doc_path, dest_pathWord)

                        # print(f"File has been moved to its destination. src: {doc_path}, destination: {dest_pathWord} ")

                except Exception as e:
                    createText('exceptionHasOccured...', f'{e}')

            print(f"{currentFileName} FINISHED")

        elif event.event_type == 'created' and '@' in currentFileNameSplitted and not outer_is_temporary_file(
                event.src_path):
            print(f"{currentFileName} ATTACHMENT STARTED")

            doc_path = event.src_path
            folderPath = currentFileNameSplitted.split("@")[1].split(".")[0]

            try:
                baseFolderPath = os.path.split(doc_path)[:-1][0]
                # print(f"Attachments detected: {doc_path}, {currentFileNameSplitted}, {baseFolderPath}")
                dest_path = os.path.join(baseFolderPath, folderPath, currentFileNameSplitted)
                try:
                    shutil.move(doc_path, dest_path)
                    print(f"{currentFileName} ATTACHMENT MOVED")
                except:
                    try:
                        createFolderAtt(os.path.join(baseFolderPath, folderPath))
                        shutil.move(doc_path, dest_path)
                        print(f"{currentFileName} ATTACHMENT MOVED")
                    except Exception as e:
                        createText('InnerAttachmentError', f'{e}')
            except Exception as e:
                createText('outerAttachmentErrorOccured', f'{e}')

    except Exception as e:
        createText('outerAllExceptionasOccured', f'{e}')

def main():
    with ProcessPoolExecutor(max_processes) as executor:

    #with concurrent.futures.ProcessPoolExecutor(max_processes) as executor:
        while True:
            if fileList:
                file_to_process = fileList.pop()
                print('File has been sent', file_to_process)
                executor.submit(mainConverter, file_to_process)

    """with ThreadPoolExecutor(max_threads) as executor:
        while True:
            if fileList:
                file_to_process = fileList.pop()
                print('File has been sent', file_to_process)
                executor.submit(mainConverter, file_to_process)"""


if __name__ == '__main__':
    directory_to_watch = r"C:\inetpub\wwwroot\utkuploads"
    event_handler = DocFileHandler()

    observer = Observer()
    observer.schedule(event_handler, path=directory_to_watch, recursive=False)
    observer.start()

    try:
        main()
    except KeyboardInterrupt:

        observer.stop()
    observer.join()

我尝试过使用三种不同的方法,其中性能最好的是 ProcessPoolExecutor,但它不能同时转换文件。它不像 5 个 WORD 那样打开并转换它们

async

with ProcessPoolExecutor(max_processes) as executor:
with concurrent.futures.ProcessPoolExecutor(max_processes) as executor:
with ThreadPoolExecutor(max_threads) as executor:

我做错了什么?

python asynchronous pdf ms-word multiprocessing
1个回答
0
投票

你的函数

main
不断循环检查
fileList
中是否有任何东西。这是没有生产力的,并且可能会阻止您的事件处理程序线程及时向
fileList
添加新文件。相反,为检测正在创建的新文件的事件处理程序提供对多处理池的访问权限,以便它在检测到新创建的文件时可以直接提交新的转换任务。

尝试以下操作,但请在运行之前先检查所有内容,以确保我没有错误地更改任何内容(我尝试引入一些其他效率):

import os
import time
import pythoncom
from win32com import client
from docx.shared import Pt
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import shutil

USE_MULTIPROCESSING = True

if USE_MULTIPROCESSING:
    from concurrent.futures import ProcessPoolExecutor as PoolExecutor
else:
    from concurrent.futures import ThreadPoolExecutor as PoolExecutor

baseAd = r"C:\inetpub\wwwroot\utkuploads"

def createText(filename, filedetail):
    with open(r"{baseAd}\{filename}.txt".format(filename=filename), 'w') as f:
        f.write(f'{filedetail}')

def doc2pdf(doc_name, pdf_name, font_size=8):
    pythoncom.CoInitialize()
    word = client.DispatchEx("Word.Application")
    if os.path.exists(pdf_name):
        os.remove(pdf_name)
    worddoc = word.Documents.Open(doc_name, ReadOnly=1)
    worddoc.Content.Font.Size = font_size
    try:
        worddoc.SaveAs(pdf_name, FileFormat=17)
    except Exception as e:
        createText('wordToPdfException', f"{e}")
    worddoc.Close()
    # Quit the Word application
    word.Quit()
    pythoncom.CoUninitialize()
    return pdf_name

def outer_is_temporary_file(filename):
    return filename.startswith("~$")

def createFolder(baseAd, folderName):
    path = os.path.join(baseAd, folderName)
    isExist = os.path.exists(path)

    try:
        if not isExist:
            os.makedirs(path)
        return path
    except:
        return path

def createFolderAtt(folderName):
    isExist = os.path.exists(folderName)
    try:
        if not isExist:
            os.makedirs(folderName)
        return folderName
    except:
        return folderName

def mainConverter(event):
    file_to_process = event.src
    # The first [0] is root directory utkuploads the second is the file name with extension.
    currentFileName = os.path.split(file_to_process)
    currentFileNameSplitted = os.path.split(file_to_process)[-1]
    try:
        ## This part needs to work for the files that needs to be converted to PDF.
        ## It catches DOCX files and takes their location by doc_path and creates a fake pdf_path directory

        if event.event_type == 'created' and file_to_process.lower().endswith(
                '.docx') and '@' not in currentFileNameSplitted and not outer_is_temporary_file(
                file_to_process):
            print(f"{currentFileName} STARTED")

            doc_path = file_to_process
            pdf_path = os.path.splitext(doc_path)[0] + '.pdf'

            # print(f'Doc path: {doc_path}, \nPdf path: {pdf_path}')

            # If '_' in doc_path
            if '_' in doc_path:
                print(f'New Template has been detected: {doc_path}')
                return
            # If file is not temporary, not _ (template), not attachment, not TEMPLATE-REPORT
            elif '~$' not in doc_path and '_' not in doc_path and '@' not in doc_path and 'TEMPLATE-REPORT' not in doc_path:

                # print(f"File will be converted here: {doc_path}")
                try:
                    if '-GENERATED-REPORT' in doc_path:
                        # Here pdf convertion happens.
                        doc2pdf(doc_path, pdf_path)
                        # Create subFolder based on PDF file.
                        createFolderPath = os.path.split(pdf_path)[-1].split(".")[0]
                        createFolderPath = createFolderPath.replace('-GENERATED-REPORT', '')

                        try:
                            newFolderPath = createFolder(baseAd, createFolderPath)
                        except Exception as error:
                            createText('createFolderGeneratedReport', f'{error}')

                        # print(f"New folder has been created: {newFolderPath}")
                        pdfFileName = os.path.split(pdf_path)[-1]

                        src_pdf = pdf_path
                        dest_pathPdf = os.path.join(newFolderPath, pdfFileName)
                        shutil.move(src_pdf, dest_pathPdf)
                        # print(f"File has been moved to its destination. src: {src_pdf}, destination: {dest_pathPdf} ")

                        # print('Doc path', doc_path)
                        wordFileName = os.path.split(doc_path)[-1]
                        wordPdf = wordFileName
                        dest_pathWord = os.path.join(newFolderPath, wordPdf)
                        shutil.move(doc_path, dest_pathWord)

                    # print( f"Generated Rapor File has been moved to its destination. src: {doc_path}, destination: {dest_pathWord} ")

                    elif '-IMZALIRAPOR' in doc_path:
                        # Here pdf convertion happens.
                        doc2pdf(doc_path, pdf_path)
                        # Create subFolder based on PDF file.
                        createFolderPath = os.path.split(pdf_path)[-1].split(".")[0]
                        createFolderPath = createFolderPath.replace('-IMZALIRAPOR', '')

                        try:
                            newFolderPath = createFolder(baseAd, createFolderPath)
                        except Exception as error:
                            createText('CreateFolderImzaliRapor', f'{error}')
                        # print(f"New folder has been created: {newFolderPath}")
                        pdfFileName = os.path.split(pdf_path)[-1]

                        src_pdf = pdf_path
                        dest_pathPdf = os.path.join(newFolderPath, pdfFileName)
                        shutil.move(src_pdf, dest_pathPdf)
                        # print(f"File has been moved to its destination. src: {src_pdf}, destination: {dest_pathPdf} ")

                        # print('Doc path', doc_path)
                        wordFileName = os.path.split(doc_path)[-1]
                        wordPdf = wordFileName
                        dest_pathWord = os.path.join(newFolderPath, wordPdf)
                        shutil.move(doc_path, dest_pathWord)

                        # print(f"Imzali Report File has been moved to its destination. src: {doc_path}, destination: {dest_pathWord} ")


                    elif 'GENERATED-REPORT' not in doc_path and '-IMZALIRAPOR' not in doc_path and '@' not in doc_path:
                        # Here pdf convertion happens.
                        doc2pdf(doc_path, pdf_path)
                        # Create subFolder based on PDF file.
                        createFolderPath = os.path.split(pdf_path)[-1].split(".")[0]

                        try:
                            newFolderPath = createFolder(baseAd, createFolderPath)
                        except Exception as error:
                            createText('buAnaModel', f'{error}')

                        # print(f"New folder has been created: {newFolderPath}")
                        pdfFileName = os.path.split(pdf_path)[-1]

                        src_pdf = pdf_path
                        dest_pathPdf = os.path.join(newFolderPath, pdfFileName)
                        shutil.move(src_pdf, dest_pathPdf)
                        # print(f"File has been moved to its destination. src: {src_pdf}, destination: {dest_pathPdf} ")

                        # print('Doc path', doc_path)
                        wordFileName = os.path.split(doc_path)[-1]
                        wordPdf = wordFileName
                        dest_pathWord = os.path.join(newFolderPath, wordPdf)
                        shutil.move(doc_path, dest_pathWord)

                        # print(f"File has been moved to its destination. src: {doc_path}, destination: {dest_pathWord} ")

                except Exception as e:
                    createText('exceptionHasOccured...', f'{e}')

            print(f"{currentFileName} FINISHED")

        elif event.event_type == 'created' and '@' in currentFileNameSplitted and not outer_is_temporary_file(
                file_to_process):
            print(f"{currentFileName} ATTACHMENT STARTED")

            doc_path = file_to_process
            folderPath = currentFileNameSplitted.split("@")[1].split(".")[0]

            try:
                baseFolderPath = os.path.split(doc_path)[:-1][0]
                # print(f"Attachments detected: {doc_path}, {currentFileNameSplitted}, {baseFolderPath}")
                dest_path = os.path.join(baseFolderPath, folderPath, currentFileNameSplitted)
                try:
                    shutil.move(doc_path, dest_path)
                    print(f"{currentFileName} ATTACHMENT MOVED")
                except:
                    try:
                        createFolderAtt(os.path.join(baseFolderPath, folderPath))
                        shutil.move(doc_path, dest_path)
                        print(f"{currentFileName} ATTACHMENT MOVED")
                    except Exception as e:
                        createText('InnerAttachmentError', f'{e}')
            except Exception as e:
                createText('outerAttachmentErrorOccured', f'{e}')

    except Exception as e:
        createText('outerAllExceptionasOccured', f'{e}')

class DocFileHandler(FileSystemEventHandler):
    def __init__(self, executor):
        """executor is a multiprocessing pool."""

        super().__init__()
        self.executor = executor
        
    def is_temporary_file(filename):
        return filename.startswith("~$")

    def on_created(self, event):
        file_to_process = event.src_path
        if not (self.is_temporary_file(os.path.basename(file_to_process))
                or file_to_process.endswith('.tmp')
                or event.is_directory
                or os.path.basename(file_to_process).startswith("~$")
        ):
            print('File has been sent', event)
            self.executor.submit(mainConverter, event)


def main():
    pool_size = os.cpu_count() or 1

    with PoolExecutor(pool_size) as executor:
        directory_to_watch = baseAd
        event_handler = DocFileHandler(executor)
    
        observer = Observer()
        observer.schedule(event_handler, path=directory_to_watch, recursive=False)
        observer.start()
    
        try:
            input('Hit Enter to terminate: ')
        except KeyboardInterrupt:
            pass
        observer.stop()
        observer.join()

    # Implciitly wait for all submitted conversions to complete
    # (a call to executor.shutdown() will be made):


if __name__ == '__main__':
    main()
© www.soinside.com 2019 - 2024. All rights reserved.