我正在尝试实现似乎是标准的单一生产者-队列的多个消费者模式。我使用aiohttp
,所以我的生产者是异步的,但是我需要用于使用的库是同步的,因此我在线程执行器中运行使用者,并将janus
库用于支持异步和同步接口的队列。我的最小示例是:
import asyncio
from time import sleep
from janus import Queue
async def test_concurrency():
iterations = 10 # or 10k, or 1M
async def produce_data(q):
nonlocal iterations
print('starting producing')
for raw_data in range(iterations):
await q.put(raw_data)
print('finishing feeding data')
return None
def consume_data(q, i):
print(f"{i} init")
print(f"{i} start")
try:
for datapoint in iter(q.get, None):
print(f"{i} cycle start")
# Do some work
sleep(1)
q.task_done()
print(f"{i} consumed")
print(f"{i} cycle end")
except Exception as e:
print('error is:')
print(e)
print('consumer exiting on error')
raise
print(f"{i} producer exit")
loop = asyncio.get_running_loop()
q = Queue(30)
producer = asyncio.create_task(produce_data(q.async_q))
consumers = [
loop.run_in_executor(None, consume_data, q.sync_q, x) for x in range(30)
]
await asyncio.wait({producer})
print('---- done producing')
for _ in consumers:
await q.async_q.put(None)
await asyncio.wait({*consumers})
for c in consumers:
print('canceling')
c.cancel()
print('---- done consuming')
def main():
asyncio.get_event_loop().run_until_complete(test_concurrency())
if __name__ == '__main__':
main()
只要使用者创建者中的range(..)参数保持在11以下,一切都会按预期工作:
0 init
0 start
1 init
...
starting producing
8 init
8 start
finishing feeding data
---- done producing
8 cycle start
0 cycle start
...
1 consumed
1 cycle end
1 cycle start
2 consumed
2 cycle end
...
canceling
canceling
...
---- done consuming
一旦我使用了10个以上的使用者(例如示例代码中的30个),就不会引发任何错误,但是执行似乎始终只发生在i = 9使用者中:
0 start
1 init
1 start
...
starting producing
finishing feeding data
---- done producing
9 init
9 start
9 cycle start
9 consumed
9 cycle end
9 cycle start
9 consumed
9 cycle end
...
29 producer exit
...
2 producer exit
canceling
...
---- done consuming
我使用异步流程有什么问题吗?
提前感谢。
好,这里有两个问题。一个很明显:run_in_executor将为您创建一个默认池,其max_workers
将成为您的cpu_count + 4
,因此range(20)
绝不会导致所有线程同时工作。
另一个(占用第十个线程的执行)比较棘手。这个:
consumers = [loop.run_in_executor(None, consume_data, q.sync_q, x) for x in range(20)]
# in BaseEventLoop.run_in_executor:
...
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
仅使一个线程实际消耗,而在功能上看似相同:
executor = concurrent.futures.ThreadPoolExecutor()
consumers = [loop.run_in_executor(executor, consume_data, q.sync_q, x) for x in range(20)]
导致8个线程(无论默认线程数是多少)都处于活动状态。
原始答案的解决方案是显式创建ThreadPoolExecutor
,但是如果使用默认池,我仍然无法解释为什么使用单线程猪队列。