使用“asyncio.Queue”和“TaskGroup”在“pytest”期间执行挂起或死锁

问题描述 投票:0回答:1

我是异步编程新手,我试图了解如何将

TaskGroup
asyncio.Queue
一起使用。我有以下带有测试的模块,但是在执行
pytest
时,它打印出队列项,但随后它只是挂起/死锁?对我做错了什么有什么建议吗?

模块:

AsynchronousQueueBeta.py

from asyncio import Queue, TaskGroup

class AsynchronousQueueBeta:
    """Asynchronous Queue Beta"""

    async def fetch_recursive(self, source_list: list[str], maximum_connection: int = 10):
        """Fetch Recursive"""
        print('Fetch Recursive')
        query_queue = Queue()

        for source in source_list:
            query_queue.put_nowait(source)

        async with TaskGroup() as group:
            task_list = [
                group.create_task(self.fetch_query(query_queue)) for _ in range(maximum_connection)
            ]

        await query_queue.join()
        result_list = [task.result() for task in task_list]
        print(f'Result List: {result_list}')


    async def fetch_query(self, queue: Queue):
        """Fetch Query"""
        while True:
            query = await queue.get()
            print(f'Query: {query}')
            queue.task_done()

测试:

TestAsynchronousQueueBeta.py

import pytest
from AsynchronousQueueBeta import AsynchronousQueueBeta

class TestAsynchronousQueueBeta():
    """Test Asynchronous Queue Beta"""

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        'source_list', [
            [
                'https://httpbin.org/anything/1',
                'https://httpbin.org/anything/2',
                'https://httpbin.org/anything/3',
                'https://httpbin.org/anything/4',
                'https://httpbin.org/anything/5',
                'https://httpbin.org/anything/6',
                'https://httpbin.org/anything/7',
                'https://httpbin.org/anything/8',
                'https://httpbin.org/anything/9',
                'https://httpbin.org/anything/10',
                'https://httpbin.org/anything/11',
                'https://httpbin.org/anything/12',
            ],
        ]
    )
    async def test_fetch_recursive(self, source_list: list[str]):
        """Test Fetch Recursive"""
        beta = AsynchronousQueueBeta()

        await beta.fetch_recursive(
            source_list=source_list,
        )

结果

platform darwin -- Python 3.12.1, pytest-7.4.4, pluggy-1.3.0 -- /Users/abc/Desktop/Project/Workspace/Python/pv312/bin/python3.12
cachedir: .pytest_cache
rootdir: /Users/abc/Desktop/Project/Async
configfile: pytest.ini
plugins: asyncio-0.23.3, anyio-4.2.0
asyncio: mode=Mode.STRICT
collected 1 item

Test/TestAsynchronousQueueBeta.py::TestAsynchronousQueueBeta::test_fetch_recursive[source_list0] Fetch Recursive
Query: https://httpbin.org/anything/1
Query: https://httpbin.org/anything/2
Query: https://httpbin.org/anything/3
Query: https://httpbin.org/anything/4
Query: https://httpbin.org/anything/5
Query: https://httpbin.org/anything/6
Query: https://httpbin.org/anything/7
Query: https://httpbin.org/anything/8
Query: https://httpbin.org/anything/9
Query: https://httpbin.org/anything/10
Query: https://httpbin.org/anything/11
Query: https://httpbin.org/anything/12
^C

!!! KeyboardInterrupt !!!
/opt/python/3.12.1/lib/python3.12/selectors.py:566: KeyboardInterrupt
(to show a full traceback on KeyboardInterrupt use --full-trace)
...
python pytest python-asyncio pytest-asyncio
1个回答
0
投票

我想我已经弄清楚了(在某种意义上),我放置了一个信号或哨兵值来告诉队列何时完成,这是一个示例,我还有一个完整示例,其中带有测试

async def fetch_recursive(
    self,
    source_list: list[str],
    maximum_task: int = 10,
):
    print('Fetch Recursive')
    query_queue = Queue()
    result_queue = Queue()

    async with TaskGroup() as group:
        task_list = [
            group.create_task(
                self.fetch_query(
                    name=f'Worker-{index + 1}',
                    query_queue=query_queue,
                    result_queue=result_queue,
                )
            ) for index in range(maximum_task)
        ]

        for source in source_list:
            await query_queue.put(source)

        for _ in range(maximum_task):
            await query_queue.put(None)

    result_list = []
    while not result_queue.empty():
        result = await result_queue.get()
        result_list.append(result)

    # Display the result(s)
    for result in result_list:
        print(f'Result: {result}')

async def fetch_query(
    self,
    name: str,
    query_queue: Queue,
    result_queue: Queue,
):
    while True:
        query = await query_queue.get()

        if query is None:
            break

        print(f'{name} Fetch Query: {query}')

        await asyncio.sleep(1)

        result = f'{name} Result: {query}'
        await result_queue.put(result)
© www.soinside.com 2019 - 2024. All rights reserved.