Pytest 两个带有无限循环和等待命令的异步函数

问题描述 投票:0回答:1

我正在对 python 代码进行单元测试。它有一个异步循环。该循环中的两个主要函数都有无限的 while 循环。我已经测试了代码的非异步部分。我想知道对这两个函数进行单元测试的最佳策略是什么:

import asyncio

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_async_tasks())
    .
    .
    

async def run_async_tasks():
    # initialize loop and its variables
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(maxsize=100)

    # Creating Asyncio tasks
    task1 = loop.create_task(receive_data(queue), name="receive_data")
    task2 = loop.create_task(analyse_data(queue), name="analyse_data")

    await asyncio.gather(task1, task2)

async def receive_data(queue):
    # Data packets are yielded from an non-ending stream by sync_receive_data_packets
    for data_packet in sync_receive_data_packets():
        await queue.put(data_packet)

async def analyse_data(queue):
    while not termination_signal_received():
        data_packet = await queue.get()
        sync_data_analysis(data_packet)
        queue.task_done()

我的问题是是否/如何可以进行单元测试

receive_data
analyse_data

为了简单起见,读取数据 (

sync_receive_data_packets
) 和执行分析 (
sync_data_analysis
) 的主要逻辑位于非异步函数中。他们已成功通过测试。我不确定如何测试
await queue.put
await queue.get()
部分,特别是当它们处于无限循环中时。 事实上,我想知道这些函数是否可以进行单元测试,因为它们不返回任何内容。一个将一项放入队列,另一个读取它。

python pytest python-asyncio pytest-asyncio
1个回答
0
投票

这两个函数都不是“无尽的”——第一个函数将在

sync_receive_data_packets
返回的生成器耗尽时结束,而另一个则在
termination_signal_received()
返回
True
时结束。只需使用
mock.patch
将这两个函数指向您的测试控制下的可调用函数。

© www.soinside.com 2019 - 2024. All rights reserved.