消费者生产者队列 .get() 永远加载

问题描述 投票:0回答:1

我在 article_urls: list[str] 中有一个 URL 列表,我正在尝试创建几个工作人员,这些工作人员转到这些 URL 并从网页中获取不同的 URL,然后将该 URL 放入另一个队列中。我希望其他几个工作人员通过另一个队列并下载 URLS 上的文件,然后将它们发布到 S3。我认为该程序在 entry = await queue.get() 上停止并且永远不会继续前进。我该如何解决这个问题?

def get_first_pass_or_none(inp, driver):
    for x in inp:
        try:
            return x(driver)
        except:
            pass
    return None


async def url_producer(download_queue, pages_queue, producer_id):
    first = True
    while True:
        try:
            article_url = await pages_queue.get()
            instance_driver = wd[producer_id]  # or any other webdriver
            instance_driver.get(article_url)
            article_id = str(uuid.uuid4())

            if first:
                await asyncio.sleep(2)
                instance_driver.find_element(
                    By.XPATH, '//*[@id="onetrust-close-btn-container"]/button').click()
                first = False

            article_title = get_first_pass_or_none([lambda instance_driver: instance_driver.find_element(
                By.XPATH, '//*[@id="documentTitle"]').text], instance_driver)
            author = get_first_pass_or_none([lambda instance_driver: re.sub(r"\([^()]*\)", "", instance_driver.find_element(
                By.XPATH, '//*[@id="authordiv"]/span[2]/span/a/strong')
                .text)], instance_driver)
            publication_info = get_first_pass_or_none([lambda instance_driver: instance_driver.find_element(
                By.XPATH, '//*[@id="authordiv"]/span[2]/span').text], instance_driver)
            publication_location = get_first_pass_or_none([lambda publication_info: re.findall(
                r'\[(.*?)\]', publication_info)[0]], instance_driver)
            publication_date = publication_info

            output_metadata[article_id] = {
                "title": article_title,
                "author": author,
                "location": publication_location,
                "date": publication_date
            }
            pdf_url = instance_driver.find_element(
                By.CLASS_NAME, 'pdf-download').get_attribute('href')
            await download_queue.put({
                "article_id": article_id,
                "pdf_url": pdf_url,
            })
            print(download_queue.qsize())
            pages_queue.task_done()
        except Exception as e:
            logger.debug(f"Error {e}")
            keyboard.wait(keys[producer_id])


async def pdf_downloader(queue, consumer_id):
    while True:
        try:
            print(f"pdf_downloader {consumer_id} waiting for queue")
            entry = await queue.get()
            print(f"pdf_downloader {consumer_id} got queue entry")
            article_id = entry['article_id']
            pdf_url = entry['pdf_url']
            response = requests.get(pdf_url)
            pdf_content = response.content
            object_key = f"{article_id}.pdf"
            s3.Bucket(bucket_name).put_object(Key=object_key, Body=pdf_content)
            queue.task_done()
        except Exception as e:
            logger.debug(f"Error {e}")


async def main():
    # Create a shared queue
    download_queue = asyncio.Queue()
    pages_queue = asyncio.Queue()
    for page_url in article_urls:
        pages_queue.put_nowait(page_url)

    # Create two producers and two consumers
    producers = [asyncio.create_task(
        url_producer(download_queue, pages_queue, i)) for i in range(1)]
    consumers = [asyncio.create_task(
        pdf_downloader(download_queue, i)) for i in range(8)]

    # Wait for the producers to finish
    await asyncio.gather(*producers)

    # Cancel the consumers
    for consumer in consumers:
        consumer.cancel()

没有输出到 logger.debug。终端输出:

pdf_downloader 0 等待队列 pdf_downloader 1 等待队列 pdf_downloader 2 等待队列 pdf_downloader 3 等待队列 pdf_downloader 4 等待队列 pdf_downloader 5 等待队列 pdf_downloader 6 等待队列 pdf_downloader 7 等待队列 1个 2个 3个 4个 5个 6个 7 8个 9 10 11 12 13 14 15 16

python asynchronous python-asyncio producer-consumer
1个回答
-1
投票

我以前使用

Queue
在生产者和消费者之间传递工作,我个人的偏好是先启动我的消费者,然后启动我的生产者以确保消费者准备好并等待工作到达。

我会尝试以下方法:

async def main():
    # Create a shared queue
    download_queue = asyncio.Queue()
    pages_queue = asyncio.Queue()
    for page_url in article_urls:
        pages_queue.put_nowait(page_url)

    # Create two producers and eight consumers
    consumers = [asyncio.create_task(
        pdf_downloader(download_queue, i)) for i in range(8)]
    producers = [asyncio.create_task(
        url_producer(download_queue, pages_queue, i)) for i in range(1)]
    
    # Wait for the producers and consumers to finish
    await asyncio.gather(*(producers + consumers))

我删除了消费者的取消,因为如果你想让他们运行和处理 PDF 数据,我不确定你为什么要这样做。如果再次需要类似的东西,你显然可以修改这个例子。

我还注意到您正在使用

requests
执行 GET 请求,无论您运行了多少异步消费者,该请求都会阻塞。如果你想在不阻塞的情况下使用 asyncio 运行它,你必须使用
loop.run_in_executor()
:

在线程或进程池中运行这个函数
async def pdf_downloader(queue, consumer_id):
    loop = asyncio.get_event_loop()
    while True:
        try:
            print(f"pdf_downloader {consumer_id} waiting for queue")
            entry = await queue.get()
            print(f"pdf_downloader {consumer_id} got queue entry")
            article_id = entry['article_id']
            pdf_url = entry['pdf_url']
            request_future = loop.run_in_executor(None, requests.get, pdf_url)
            response = await request_future
            pdf_content = response.content
            object_key = f"{article_id}.pdf"
            s3.Bucket(bucket_name).put_object(Key=object_key, Body=pdf_content)
            queue.task_done()
        except Exception as e:
            logger.debug(f"Error {e}")

您可能还需要通过从 Amazon Boto 库调用

put_object()
来执行此操作。

© www.soinside.com 2019 - 2024. All rights reserved.