Airflow 具有内置的 dag 和任务测试支持,但不足以以并非所有任务都需要执行的方式测试任务。
目标是测试 python 可调用对象。它可以通过 unittest 或任何其他库来完成,但可调用对象之间的 xcom 是一个问题。
如何模拟 xcom,以便在可调用函数中按预期模拟函数 xcom_pull 和 xcom_push?
以下对我有用
from unittest import TestCase
from unittest.mock import Mock
class Tasks(TestCase):
def setUp(self) -> None:
"""setup context mock passed to python callable"""
self.context = dict()
self.context['ti'] = Mock()
self.xcom = XCom()
self.context['ti'].xcom_pull.side_effect = self.xcom.side_effect
return super().setUp()
def test_tasks(self):
# testing python callable functions foo and cat
# if foo requires return value from another task
# through xcom_pull(task_ids='bar')
self.xcom.args(5, task_ids = 'bar')
# if foo requires xcom_pull(task_ids='bar', key='joe')
self.xcom.args(7, task_ids = 'bar', key = 'joe')
result = foo(**self.context)
# test the result e.g
self.assertTrue(result)
# if python callable e.g cat, needs to be chained and
# needs xcom_push values from foo
self.xcom.xcom_push(self.context['ti'].xcom_push.call_args_list,'c')
# and to get the return value using xcom_pull(task_ids='foo')
self.xcom.args(result, task_ids='foo')
result = cat(**self.context)
self.assertTrue(result)
下面是 XCom 类
class XCom:
"""mocks xcom"""
def __init__(self):
self.map=[]
self.last_args_index = 0
def side_effect(self, *args, **kwargs):
"""mock xcom_pull"""
for result, sig in self.map:
if sig == kwargs:
return result
raise Exception('signature not found')
def args(self, result, **kwargs):
"""appending data to simulate xcom_pull"""
self.map.append((result, kwargs))
def xcom_push(self, call_args_list, task):
"""saving xcom_push data"""
for args in call_args_list[self.last_args_index:len(call_args_list)]:
result = args.kwargs.pop('value')
kwargs = dict()
kwargs['task_ids'] = task
kwargs.update(args.kwargs)
self.args(result, **kwargs)
self.last_args_index += 1