我的目标是同时从队列中抓取 URL。根据抓取结果,队列可能会被扩展。这是 MWE:
import queue
from concurrent.futures import ThreadPoolExecutor
import time
def get(url): # let's assume that the HTTP magic happens here
time.sleep(1)
return f'data from {url}'
def crawl(url, url_queue: queue.Queue, result_queue: queue.Queue):
data = get(url)
result_queue.put(data)
if 'more' in url:
url_queue.put('url_extended')
url_queue = queue.Queue()
result_queue = queue.Queue()
for url in ('some_url', 'another_url', 'url_with_more', 'another_url_with_more', 'last_url'):
url_queue.put(url)
with ThreadPoolExecutor(max_workers=8) as executor:
while not url_queue.empty():
url = url_queue.get()
executor.submit(crawl, url, url_queue, result_queue)
while not result_queue.empty():
data = result_queue.get()
print(data)
在此 MWE 中,两个 URL 需要再次抓取:
'url_with_more'
和 'another_url_with_more'
。它们在爬行时添加到 url_queue
。
但是,该解决方案在处理这两个“更多”URL 之前就结束了;运行后,
url_queue
仍然有两个条目。
如何确保ThreadPoolExecutor不会过早退出?我是否误解了ThreadPoolExecutor?
你有一个竞争条件,其中
while not url_queue.empty()
发生在url_queue.put('url_extended')
之前,你需要在等待所有提交的作业之前不要退出线程池..
task_queue = queue()
while True:
with ThreadPoolExecutor(max_workers=8) as executor:
while not url_queue.empty():
url = url_queue.get()
task_queue.put(executor.submit(crawl, url, url_queue, result_queue))
while not task_queue.empty():
task_queue.get().wait()