我正在对 python 代码进行单元测试。它有一个异步循环。该循环中的两个主要函数都有无限的 while 循环。我已经测试了代码的非异步部分。我想知道对这两个函数进行单元测试的最佳策略是什么:
import asyncio
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run_async_tasks())
.
.
async def run_async_tasks():
# initialize loop and its variables
loop = asyncio.get_event_loop()
queue = asyncio.Queue(maxsize=100)
# Creating Asyncio tasks
task1 = loop.create_task(receive_data(queue), name="receive_data")
task2 = loop.create_task(analyse_data(queue), name="analyse_data")
await asyncio.gather(task1, task2)
async def receive_data(queue):
# Data packets are yielded from an non-ending stream by sync_receive_data_packets
for data_packet in sync_receive_data_packets():
await queue.put(data_packet)
async def analyse_data(queue):
while not termination_signal_received():
data_packet = await queue.get()
sync_data_analysis(data_packet)
queue.task_done()
我的问题是是否/如何可以进行单元测试
receive_data
和analyse_data
。
为了简单起见,读取数据 (
sync_receive_data_packets
) 和执行分析 (sync_data_analysis
) 的主要逻辑位于非异步函数中。他们已成功通过测试。我不确定如何测试 await queue.put
和 await queue.get()
部分,特别是当它们处于无限循环中时。
事实上,我想知道这些函数是否可以进行单元测试,因为它们不返回任何内容。一个将一项放入队列,另一个读取它。
这两个函数都不是“无尽的”——第一个函数将在
sync_receive_data_packets
返回的生成器耗尽时结束,而另一个则在 termination_signal_received()
返回 True
时结束。只需使用 mock.patch
将这两个函数指向您的测试控制下的可调用函数。