我有文件从外部系统进入,并由DB进入,对于每个新文件,我正在按顺序通过4个函数来处理它。我的代码可以一次处理一个文件。
[目前,我正在尝试使用Pool
并行处理文件。我不确定我的代码是否正在并行处理,因为并行处理对我来说是新手,无法找到一种方法在控制台中查看详细信息,例如-
file 1 processing with thread 1
file 2 processing with thread 2
file 1 processing complete with thread 1
file 2 processing complete with thread 2
...so on.
请任何人可以帮助我在控制台中获得这种输出。
我的Python代码:
import os
import threading
import subprocess
import pyodbc
import time
from multiprocessing.dummy import Pool as ThreadPool
class Workflow:
def sql_connection(self):
conn = pyodbc.connect('Driver={SQL Server};'
'Server=MSSQLSERVER01;'
'Database=TEST;'
'Trusted_Connection=yes;')
print("DB Connected..")
return conn
def Function1(self):
print ("function 1 Started..")
def Function2(self):
print ("function 2 Started..")
def Function3(self):
print ("function 3 Started..")
def Function4(self):
print ("function 4 Started..")
def ProcessFile(self):
print (" Processs %s\tWaiting %s seconds" )
self.Function1()
self.Function2()
self.Function3()
self.Funciton4()
print (" Process %s\tDONE" )
def Start(self):
#Get number of files in REQUESTED STATE.
connsql = self.sql_connection()
query = "select count(*) from [TEST].[dbo].[files] where Status ='REQUESTED'"
files = connsql.cursor().execute(query).fetchone()
print(str(files[0]) + " files to be processed..")
# Get filing ids of files in REQUESTED STATE.
query = "select distinct filing_id from [TEST].[dbo].[files] where Status ='REQUESTED'"
resultset = connsql.cursor().execute(query).fetchall()
filingIds = []
for id in resultset:
filingIds.append(id[0])
connsql.cursor().commit()
connsql.close()
#Create Threads based on number of file ids to be processed.
pool = ThreadPool(len(filingIds))
results = pool.map(self.ProcessFile(),filingIds) ## Process the FilingIds in parallel.
print(results)
# close the pool and wait for the work to finish
pool.close()
pool.join()
A = Workflow()
A.Start()
我认为问题仅仅是因为您错误地使用了ThreadPool.map
。您必须传递self.ProcessFile
而不是self.ProcessFile()
。为什么?
map
期望可调用,但是self.ProcessFile()
实际上是ProcessFile
调用的结果,即为None。因此map
尝试调用None,这可能会在无提示的情况下失败。
from multiprocessing import Process
import time
class WorkFlow:
def __init__(self):
pass
def func1(self, *args):
print('Func1 : {}'.format(args))
time.sleep(5)
print('Func1 Completed!')
def func2(self, *args):
print('Func2 : {}'.format(args))
time.sleep(10)
print('Func2 Completed!')
if __name__ == '__main__':
wf = WorkFlow()
processes = [Process(target=wf.func1), Process(target=wf.func2)]
for p in processes:
p.start()
for p in processes:
p.join()
上面的代码将启动3个Python进程(1 Master Process, 2 Slave Proceesses
)。第一个Python进程将在5秒后终止,第二个Python进程将在10秒后终止。
这可以在Linux上使用top
命令看到。
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS
9918 Python 0.0 00:00.00 1 0 8 2148K 0B 0B
9917 Python 0.0 00:00.00 1 0 8 2144K 0B 0B
9916 Python 0.0 00:00.05 1 0 14 6680K 0B 0B