在 python 中填充队列并管理多处理

问题描述 投票:0回答:3

我在 python 中遇到这个问题:

  • 我有一个 URL 队列,需要时不时地检查一下
  • 如果队列已满,我需要处理队列中的每个项目
  • 队列中的每个项目必须由单个进程处理(多处理)

到目前为止,我成功地“手动”实现了这样的目标:

while 1:
        self.updateQueue()

        while not self.mainUrlQueue.empty():
            domain = self.mainUrlQueue.get()

            # if we didn't launched any process yet, we need to do so
            if len(self.jobs) < maxprocess:
                self.startJob(domain)
                #time.sleep(1)
            else:
                # If we already have process started we need to clear the old process in our pool and start new ones
                jobdone = 0

                # We circle through each of the process, until we find one free ; only then leave the loop 
                while jobdone == 0:
                    for p in self.jobs :
                        #print "entering loop"
                        # if the process finished
                        if not p.is_alive() and jobdone == 0:
                            #print str(p.pid) + " job dead, starting new one"
                            self.jobs.remove(p)
                            self.startJob(domain)
                            jobdone = 1

然而,这会导致大量的问题和错误。我想知道我是否更适合使用进程池。正确的方法是什么?

但是,很多时候我的队列是空的,一秒钟就可以填满 300 个项目,所以我不太确定在这里该怎么做。

python queue multiprocessing pool
3个回答
65
投票

您可以使用

queue
的阻塞功能在启动时生成多个进程(使用
multiprocessing.Pool
)并让它们休眠,直到队列上有一些数据可供处理。如果您对此不熟悉,您可以尝试“玩”这个简单的程序:

import multiprocessing
import os
import time

the_queue = multiprocessing.Queue()


def worker_main(queue):
    print os.getpid(),"working"
    while True:
        item = queue.get(True)
        print os.getpid(), "got", item
        time.sleep(1) # simulate a "long" operation

the_pool = multiprocessing.Pool(3, worker_main,(the_queue,))
#                           don't forget the comma here  ^

for i in range(5):
    the_queue.put("hello")
    the_queue.put("world")


time.sleep(10)

在 Linux 上使用 Python 2.7.3 进行测试

这将产生 3 个进程(除了父进程之外)。每个子进程都执行

worker_main
函数。这是一个简单的循环,在每次迭代时从队列中获取一个新项目。如果没有任何东西可以处理,工作人员将阻塞。

启动时,所有 3 个进程都会休眠,直到队列收到一些数据。当数据可用时,等待的工作人员之一会获取该数据并开始处理它。之后,它尝试从队列中获取其他项目,如果没有可用的,则再次等待...


12
投票

添加了一些代码(向队列提交“None”)以很好地关闭工作线程,并添加代码来关闭并加入 the_queue 和 the_pool:

import multiprocessing
import os
import time

NUM_PROCESSES = 20
NUM_QUEUE_ITEMS = 20  # so really 40, because hello and world are processed separately


def worker_main(queue):
    print(os.getpid(),"working")
    while True:
        item = queue.get(block=True) #block=True means make a blocking call to wait for items in queue
        if item is None:
            break

        print(os.getpid(), "got", item)
        time.sleep(1) # simulate a "long" operation


def main():
    the_queue = multiprocessing.Queue()
    the_pool = multiprocessing.Pool(NUM_PROCESSES, worker_main,(the_queue,))
            
    for i in range(NUM_QUEUE_ITEMS):
        the_queue.put("hello")
        the_queue.put("world")
    
    for i in range(NUM_PROCESSES):
        the_queue.put(None)

    # prevent adding anything more to the queue and wait for queue to empty
    the_queue.close()
    the_queue.join_thread()

    # prevent adding anything more to the process pool and wait for all processes to finish
    the_pool.close()
    the_pool.join()

if __name__ == '__main__':
    main()

0
投票

我对此进行了重新设计,以使用 ProcessPoolExecutor 而不是队列,因为我认为这是最新的,并且我在自己的实现中遇到了队列问题。这也摆脱了在队列中填充 n 个 None 来终止的情况:

from concurrent.futures import ProcessPoolExecutor
import os
import time

NUM_PROCESSES = 2
NUM_QUEUE_ITEMS = 4  # so really 40, because hello and world are processed separately


def worker(item):
    print(f"{os.getpid()} got {item}\n", end="")
    time.sleep(0.5) # simulate a "long" operation
    return f"Results {os.getpid()} for {item}"

        
def main():
    with ProcessPoolExecutor(max_workers=NUM_PROCESSES) as exe:
        
        values = []
        for i in range(NUM_QUEUE_ITEMS):
            values.append(f"hello {i}")
            values.append(f"world {i}")

        exe.submit(worker,2)
         
        # Maps the method 'cube' with a iterable
        result = exe.map(worker,values)

    for r in result:
        print(f"{r}")
    

if __name__ == "__main__":
    main()
© www.soinside.com 2019 - 2024. All rights reserved.