我有一小部分工人(4)和一大堆任务(5000~)。我正在使用池并使用map_async()发送任务。因为我正在运行的任务相当长,所以我强制执行1的chunksize,这样一个长进程就无法阻止一些较短的进程。
我想做的是定期检查剩下的任务数量。我知道最多4个会活跃,我关心有多少人要处理。
我用Google搜索过,我找不到任何人这样做。
一些简单的代码可以帮助:
import multiprocessing
import time
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()
while True:
if not jobs.ready():
print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
jobs.wait(2)
else:
break
看起来像jobs._number_left就是你想要的。 _表示它是一个内部值,可能会随着开发人员的想法而改变,但它似乎是获取该信息的唯一方法。
我不知道密不透风的方式,但是如果使用Pool.imap_unordered()
函数而不是map_async,则可以拦截处理的元素。
import multiprocessing
import time
process_count = 4
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
# Actually, you should return the job you've created here.
return num
pool = multiprocess.Pool(process_count)
jobs = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
jobs.append(job)
job_count += 1
incomplete = len(items) - job_count
unsubmitted = max(0, incomplete - process_count)
print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted
pool.close()
我正在减去process_count
,因为你几乎可以假设所有进程都将使用以下两种例外之一进行处理:1)如果你使用迭代器,可能没有剩余的东西要消耗和处理,2)你可能有剩下的物品少于4件。我没有为第一个例外编写代码。但如果你需要的话,这应该很容易。无论如何,您的示例使用列表,因此您不应该遇到此问题。
编辑:我也意识到你正在使用一个While循环,这使得你看起来像是在试图定期更新某些内容,比如每半秒或者其他东西。我给出的代码作为示例不会这样做。我不确定这是不是一个问题。
我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我如何处理它是用apply_async
一次发送一个任务。我所做的大大简化的版本:
maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
if stopNowBooleanFunc(): # if for whatever reason I want to stop processing early
if donecounter == sendcounter: # wait til already sent tasks finish running
break
else: # don't send new tasks if it's time to stop
while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
pool.apply_async(mytask, (runlist[sendcounter], q))
sendcounter += 1
while not q.empty(): # process completed results as they arrive
aresult = q.get()
processResults(aresult)
donecounter += 1
请注意,我使用Queue
而不是return
ing结果。
您可以通过查看Pool._cache
属性来检查待处理作业的数量,假设您使用的是apply_async
。这是ApplyResult
存储的地方,直到它们可用并且等于ApplyResult
s待定的数量。
import multiprocessing as mp
import random
import time
def job():
time.sleep(random.randint(1,10))
print("job finished")
if __name__ == '__main__':
pool = mp.Pool(5)
for _ in range(10):
pool.apply_async(job)
while pool._cache:
print("number of jobs pending: ", len(pool._cache))
time.sleep(2)
pool.close()
pool.join()