我在 article_urls: list[str] 中有一个 URL 列表,我正在尝试创建几个工作人员,这些工作人员转到这些 URL 并从网页中获取不同的 URL,然后将该 URL 放入另一个队列中。我希望其他几个工作人员通过另一个队列并下载 URLS 上的文件,然后将它们发布到 S3。我认为该程序在 entry = await queue.get() 上停止并且永远不会继续前进。我该如何解决这个问题?
def get_first_pass_or_none(inp, driver):
for x in inp:
try:
return x(driver)
except:
pass
return None
async def url_producer(download_queue, pages_queue, producer_id):
first = True
while True:
try:
article_url = await pages_queue.get()
instance_driver = wd[producer_id] # or any other webdriver
instance_driver.get(article_url)
article_id = str(uuid.uuid4())
if first:
await asyncio.sleep(2)
instance_driver.find_element(
By.XPATH, '//*[@id="onetrust-close-btn-container"]/button').click()
first = False
article_title = get_first_pass_or_none([lambda instance_driver: instance_driver.find_element(
By.XPATH, '//*[@id="documentTitle"]').text], instance_driver)
author = get_first_pass_or_none([lambda instance_driver: re.sub(r"\([^()]*\)", "", instance_driver.find_element(
By.XPATH, '//*[@id="authordiv"]/span[2]/span/a/strong')
.text)], instance_driver)
publication_info = get_first_pass_or_none([lambda instance_driver: instance_driver.find_element(
By.XPATH, '//*[@id="authordiv"]/span[2]/span').text], instance_driver)
publication_location = get_first_pass_or_none([lambda publication_info: re.findall(
r'\[(.*?)\]', publication_info)[0]], instance_driver)
publication_date = publication_info
output_metadata[article_id] = {
"title": article_title,
"author": author,
"location": publication_location,
"date": publication_date
}
pdf_url = instance_driver.find_element(
By.CLASS_NAME, 'pdf-download').get_attribute('href')
await download_queue.put({
"article_id": article_id,
"pdf_url": pdf_url,
})
print(download_queue.qsize())
pages_queue.task_done()
except Exception as e:
logger.debug(f"Error {e}")
keyboard.wait(keys[producer_id])
async def pdf_downloader(queue, consumer_id):
while True:
try:
print(f"pdf_downloader {consumer_id} waiting for queue")
entry = await queue.get()
print(f"pdf_downloader {consumer_id} got queue entry")
article_id = entry['article_id']
pdf_url = entry['pdf_url']
response = requests.get(pdf_url)
pdf_content = response.content
object_key = f"{article_id}.pdf"
s3.Bucket(bucket_name).put_object(Key=object_key, Body=pdf_content)
queue.task_done()
except Exception as e:
logger.debug(f"Error {e}")
async def main():
# Create a shared queue
download_queue = asyncio.Queue()
pages_queue = asyncio.Queue()
for page_url in article_urls:
pages_queue.put_nowait(page_url)
# Create two producers and two consumers
producers = [asyncio.create_task(
url_producer(download_queue, pages_queue, i)) for i in range(1)]
consumers = [asyncio.create_task(
pdf_downloader(download_queue, i)) for i in range(8)]
# Wait for the producers to finish
await asyncio.gather(*producers)
# Cancel the consumers
for consumer in consumers:
consumer.cancel()
没有输出到 logger.debug。终端输出:
pdf_downloader 0 等待队列 pdf_downloader 1 等待队列 pdf_downloader 2 等待队列 pdf_downloader 3 等待队列 pdf_downloader 4 等待队列 pdf_downloader 5 等待队列 pdf_downloader 6 等待队列 pdf_downloader 7 等待队列 1个 2个 3个 4个 5个 6个 7 8个 9 10 11 12 13 14 15 16
我以前使用
Queue
在生产者和消费者之间传递工作,我个人的偏好是先启动我的消费者,然后启动我的生产者以确保消费者准备好并等待工作到达。
我会尝试以下方法:
async def main():
# Create a shared queue
download_queue = asyncio.Queue()
pages_queue = asyncio.Queue()
for page_url in article_urls:
pages_queue.put_nowait(page_url)
# Create two producers and eight consumers
consumers = [asyncio.create_task(
pdf_downloader(download_queue, i)) for i in range(8)]
producers = [asyncio.create_task(
url_producer(download_queue, pages_queue, i)) for i in range(1)]
# Wait for the producers and consumers to finish
await asyncio.gather(*(producers + consumers))
我删除了消费者的取消,因为如果你想让他们运行和处理 PDF 数据,我不确定你为什么要这样做。如果再次需要类似的东西,你显然可以修改这个例子。
我还注意到您正在使用
requests
执行 GET 请求,无论您运行了多少异步消费者,该请求都会阻塞。如果你想在不阻塞的情况下使用 asyncio 运行它,你必须使用 loop.run_in_executor()
: 在线程或进程池中运行这个函数
async def pdf_downloader(queue, consumer_id):
loop = asyncio.get_event_loop()
while True:
try:
print(f"pdf_downloader {consumer_id} waiting for queue")
entry = await queue.get()
print(f"pdf_downloader {consumer_id} got queue entry")
article_id = entry['article_id']
pdf_url = entry['pdf_url']
request_future = loop.run_in_executor(None, requests.get, pdf_url)
response = await request_future
pdf_content = response.content
object_key = f"{article_id}.pdf"
s3.Bucket(bucket_name).put_object(Key=object_key, Body=pdf_content)
queue.task_done()
except Exception as e:
logger.debug(f"Error {e}")
您可能还需要通过从 Amazon Boto 库调用
put_object()
来执行此操作。