我正在尝试测试我编写的一些异步代码。结构是:
await
ing 在一些异步 IO 调用上asyncio.gather
调用,等待每个内部循环。
我构建了一个玩具示例:import asyncio
async def loop_inner(worker, worker_num, tasks):
while tasks:
task = tasks.pop()
print('Worker number {} handled task number {}'.format(
worker_num, await worker.do_work(task)))
async def loop(workers, tasks):
tasks = [loop_inner(worker, worker_num, tasks) for
worker_num, worker in enumerate(workers)]
await asyncio.gather(*tasks)
在实际工作负载上运行时,该结构运行良好。高吞吐量、善用并行性等
问题是我要测试的时候。我想编写测试来模拟跨不同工作人员的任务分配。但是,我只想测试分发逻辑,同时模拟工作代码本身。我的本能冲动是用
AsyncMock
对象代替真正的工人。问题是,当我运行这个测试用例时,所有的工作都由一个工人处理:
from unittest import IsolatedAsyncioTestCase, main
from unittest.mock import ANY, AsyncMock, patch
class TestCase(IsolatedAsyncioTestCase):
async def test_yielding(self):
tasks = list(range(10))
workers = [AsyncMock() for i in range(2)]
for worker in workers:
worker.do_work.side_effect = lambda task: task
await loop(workers, tasks)
main()
我的输出如下:
Worker number 0 handled task number 9
Worker number 0 handled task number 8
Worker number 0 handled task number 7
Worker number 0 handled task number 6
Worker number 0 handled task number 5
Worker number 0 handled task number 4
Worker number 0 handled task number 3
Worker number 0 handled task number 2
Worker number 0 handled task number 1
Worker number 0 handled task number 0
什么给?为什么只有一个工人处理所有的工作?事件循环是否没有将控制权交给其他工作人员?
AsyncMock
协程真的没有屈服控制吗?我该怎么做才能更真实地测试它?
为什么只有一个工人处理所有的工作?
调度器永远不会在量程到期时中断,因为 我们在这里使用协作线程。 所以合作.
计算
list(range(10))
永远不会放弃控制权。
在
sleep
中用简短的 lambda
自愿放弃控制,
或将其构造为异步函数,
让调度程序有机会保留多个盘子
同时旋转。