Python:如何检查multiprocessing.Pool中待处理任务的数量?

问题描述 投票:9回答:4

我有一小部分工人(4)和一大堆任务(5000~)。我正在使用池并使用map_async()发送任务。因为我正在运行的任务相当长,所以我强制执行1的chunksize,这样一个长进程就无法阻止一些较短的进程。

我想做的是定期检查剩下的任务数量。我知道最多4个会活跃,我关心有多少人要处理。

我用Google搜索过,我找不到任何人这样做。

一些简单的代码可以帮助:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break
python pool multiprocess
4个回答
7
投票

看起来像jobs._number_left就是你想要的。 _表示它是一个内部值,可能会随着开发人员的想法而改变,但它似乎是获取该信息的唯一方法。


1
投票

我不知道密不透风的方式,但是如果使用Pool.imap_unordered()函数而不是map_async,则可以拦截处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我正在减去process_count,因为你几乎可以假设所有进程都将使用以下两种例外之一进行处理:1)如果你使用迭代器,可能没有剩余的东西要消耗和处理,2)你可能有剩下的物品少于4件。我没有为第一个例外编写代码。但如果你需要的话,这应该很容易。无论如何,您的示例使用列表,因此您不应该遇到此问题。

编辑:我也意识到你正在使用一个While循环,这使得你看起来像是在试图定期更新某些内容,比如每半秒或者其他东西。我给出的代码作为示例不会这样做。我不确定这是不是一个问题。


1
投票

我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我如何处理它是用apply_async一次发送一个任务。我所做的大大简化的版本:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

请注意,我使用Queue而不是returning结果。


0
投票

您可以通过查看Pool._cache属性来检查待处理作业的数量,假设您使用的是apply_async。这是ApplyResult存储的地方,直到它们可用并且等于ApplyResults待定的数量。

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()
© www.soinside.com 2019 - 2024. All rights reserved.