使用超过10个线程使用方和一个异步生成方会导致第10个使用方生猪队列

问题描述 投票:0回答:1

我正在尝试实现似乎是标准的单一生产者-队列的多个消费者模式。我使用aiohttp,所以我的生产者是异步的,但是我需要用于使用的库是同步的,因此我在线程执行器中运行使用者,并将janus库用于支持异步和同步接口的队列。我的最小示例是:

import asyncio
from time import sleep
from janus import Queue


async def test_concurrency():
    iterations = 10 # or 10k, or 1M

    async def produce_data(q):
        nonlocal iterations
        print('starting producing')
        for raw_data in range(iterations):
            await q.put(raw_data)
        print('finishing feeding data')
        return None

    def consume_data(q, i):
        print(f"{i} init")
        print(f"{i} start")
        try:
            for datapoint in iter(q.get, None):
                print(f"{i} cycle start")
                # Do some work
                sleep(1)
                q.task_done()
                print(f"{i} consumed")
                print(f"{i} cycle end")
        except Exception as e:
            print('error is:')
            print(e)
            print('consumer exiting on error')
            raise
        print(f"{i} producer exit")

    loop = asyncio.get_running_loop()
    q = Queue(30)
    producer = asyncio.create_task(produce_data(q.async_q))
    consumers = [
        loop.run_in_executor(None, consume_data, q.sync_q, x) for x in range(30)
    ]

    await asyncio.wait({producer})
    print('---- done producing')
    for _ in consumers:
        await q.async_q.put(None)
    await asyncio.wait({*consumers})
    for c in consumers:
        print('canceling')
        c.cancel()

    print('---- done consuming')


def main():
    asyncio.get_event_loop().run_until_complete(test_concurrency())


if __name__ == '__main__':
    main()

只要使用者创建者中的range(..)参数保持在11以下,一切都会按预期工作:

0 init
0 start
1 init
...
starting producing
8 init
8 start
finishing feeding data
---- done producing
8 cycle start
0 cycle start
...
1 consumed
1 cycle end
1 cycle start
2 consumed
2 cycle end
...
canceling
canceling
...
---- done consuming

一旦我使用了10个以上的使用者(例如示例代码中的30个),就不会引发任何错误,但是执行似乎始终只发生在i = 9使用者中:

0 start
1 init
1 start
...
starting producing
finishing feeding data
---- done producing
9 init
9 start
9 cycle start
9 consumed
9 cycle end
9 cycle start
9 consumed
9 cycle end
...
29 producer exit
...
2 producer exit
canceling
...
---- done consuming

我使用异步流程有什么问题吗?

提前感谢。

python asynchronous async-await python-asyncio aiohttp
1个回答
0
投票

好,这里有两个问题。一个很明显:run_in_executor将为您创建一个默认池,其max_workers将成为您的cpu_count + 4,因此range(20)绝不会导致所有线程同时工作。

另一个(占用第十个线程的执行)比较棘手。这个:

consumers = [loop.run_in_executor(None, consume_data, q.sync_q, x) for x in range(20)]

# in BaseEventLoop.run_in_executor:
            ...
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor()
                self._default_executor = executor

仅使一个线程实际消耗,而在功能上看似相同:

executor = concurrent.futures.ThreadPoolExecutor()
consumers = [loop.run_in_executor(executor, consume_data, q.sync_q, x) for x in range(20)]

导致8个线程(无论默认线程数是多少)都处于活动状态。

原始答案的解决方案是显式创建ThreadPoolExecutor,但是如果使用默认池,我仍然无法解释为什么使用单线程猪队列

© www.soinside.com 2019 - 2024. All rights reserved.