使用池的并行处理

问题描述 投票:0回答:2

我有文件从外部系统进入,并由DB进入,对于每个新文件,我正在按顺序通过4个函数来处理它。我的代码可以一次处理一个文件。

[目前,我正在尝试使用Pool并行处理文件。我不确定我的代码是否正在并行处理,因为并行处理对我来说是新手,无法找到一种方法在控制台中查看详细信息,例如-

file 1 processing with thread 1
file 2 processing with thread 2
file 1 processing complete with thread 1
file 2 processing complete with thread 2
...so on.

请任何人可以帮助我在控制台中获得这种输出。

我的Python代码:

import os
import threading
import subprocess
import pyodbc
import time
from multiprocessing.dummy import Pool as ThreadPool

class Workflow:

    def sql_connection(self):
        conn = pyodbc.connect('Driver={SQL Server};'
                              'Server=MSSQLSERVER01;'
                              'Database=TEST;'
                              'Trusted_Connection=yes;')
        print("DB Connected..")
        return conn

    def Function1(self):
        print ("function 1 Started..")


    def Function2(self):
        print ("function 2 Started..")

    def Function3(self):
        print ("function 3 Started..")


    def Function4(self):
        print ("function 4 Started..")

    def ProcessFile(self):
        print (" Processs %s\tWaiting %s seconds" )
        self.Function1()
        self.Function2()
        self.Function3()
        self.Funciton4()
        print (" Process %s\tDONE" )


    def Start(self):

        #Get number of files in REQUESTED STATE.
        connsql = self.sql_connection()
        query = "select count(*) from [TEST].[dbo].[files] where Status ='REQUESTED'"
        files = connsql.cursor().execute(query).fetchone()
        print(str(files[0]) + " files to be processed..")

        # Get filing ids of files in REQUESTED STATE.
        query = "select distinct filing_id from [TEST].[dbo].[files] where Status ='REQUESTED'"
        resultset = connsql.cursor().execute(query).fetchall()

        filingIds = []

        for id in resultset:
            filingIds.append(id[0])

        connsql.cursor().commit()
        connsql.close()

        #Create Threads based on number of file ids to be processed.
        pool = ThreadPool(len(filingIds))

        results = pool.map(self.ProcessFile(),filingIds) ## Process the FilingIds in parallel.

        print(results)

        # close the pool and wait for the work to finish
        pool.close()
        pool.join()

A = Workflow()
A.Start()
python python-multithreading
2个回答
0
投票

我认为问题仅仅是因为您错误地使用了ThreadPool.map。您必须传递self.ProcessFile而不是self.ProcessFile()。为什么?

map期望可调用,但是self.ProcessFile()实际上是ProcessFile调用的结果,即为None。因此map尝试调用None,这可能会在无提示的情况下失败。


0
投票
from multiprocessing import Process

import time
class WorkFlow:
    def __init__(self):
        pass

    def func1(self, *args):
        print('Func1 : {}'.format(args))
        time.sleep(5)
        print('Func1 Completed!')

    def func2(self, *args):
        print('Func2 : {}'.format(args))
        time.sleep(10)
        print('Func2 Completed!')

if __name__ == '__main__':
    wf = WorkFlow()
    processes = [Process(target=wf.func1), Process(target=wf.func2)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

上面的代码将启动3个Python进程(1 Master Process, 2 Slave Proceesses)。第一个Python进程将在5秒后终止,第二个Python进程将在10秒后终止。

这可以在Linux上使用top命令看到。

PID   COMMAND      %CPU TIME     #TH   #WQ  #PORT MEM    PURG   CMPRS
9918  Python       0.0  00:00.00 1     0    8     2148K  0B     0B
9917  Python       0.0  00:00.00 1     0    8     2144K  0B     0B
9916  Python       0.0  00:00.05 1     0    14    6680K  0B     0B
© www.soinside.com 2019 - 2024. All rights reserved.