ThreadPoolExecutor 在队列为空之前退出

问题描述 投票:0回答:1

我的目标是同时从队列中抓取 URL。根据抓取结果,队列可能会被扩展。这是 MWE:

import queue
from concurrent.futures import ThreadPoolExecutor
import time

def get(url): # let's assume that the HTTP magic happens here
    time.sleep(1)
    return f'data from {url}'

def crawl(url, url_queue: queue.Queue, result_queue: queue.Queue):
    data = get(url)
    result_queue.put(data)
    if 'more' in url:
        url_queue.put('url_extended')

url_queue = queue.Queue()
result_queue = queue.Queue()

for url in ('some_url', 'another_url', 'url_with_more', 'another_url_with_more', 'last_url'): 
    url_queue.put(url)


with ThreadPoolExecutor(max_workers=8) as executor:
    while not url_queue.empty():
        url = url_queue.get()
        executor.submit(crawl, url, url_queue, result_queue)

while not result_queue.empty():
    data = result_queue.get()
    print(data)

在此 MWE 中,两个 URL 需要再次抓取:

'url_with_more'
'another_url_with_more'
。它们在爬行时添加到
url_queue

但是,该解决方案在处理这两个“更多”URL 之前就结束了;运行后,

url_queue
仍然有两个条目。

如何确保ThreadPoolExecutor不会过早退出?我是否误解了ThreadPoolExecutor?

python concurrent.futures
1个回答
0
投票

你有一个竞争条件,其中

while not url_queue.empty()
发生在
url_queue.put('url_extended')
之前,你需要在等待所有提交的作业之前不要退出线程池..

task_queue = queue()
while True:
    with ThreadPoolExecutor(max_workers=8) as executor:
        while not url_queue.empty():
            url = url_queue.get()
            task_queue.put(executor.submit(crawl, url, url_queue, result_queue))
        while not task_queue.empty():
            task_queue.get().wait()
© www.soinside.com 2019 - 2024. All rights reserved.