如何模拟xcom来测试airflow python可调用任务?

问题描述 投票:0回答:1

Airflow 具有内置的 dag 和任务测试支持,但不足以以并非所有任务都需要执行的方式测试任务。

目标是测试 python 可调用对象。它可以通过 unittest 或任何其他库来完成,但可调用对象之间的 xcom 是一个问题。

如何模拟 xcom,以便在可调用函数中按预期模拟函数 xcom_pull 和 xcom_push?

airflow python-unittest
1个回答
0
投票

以下对我有用

from unittest import TestCase
from unittest.mock import Mock

class Tasks(TestCase):

    def setUp(self) -> None:
        """setup context mock passed to python callable"""
        self.context = dict()
        self.context['ti'] = Mock()
        self.xcom = XCom()
        self.context['ti'].xcom_pull.side_effect = self.xcom.side_effect
        return super().setUp()

    def test_tasks(self):
        # testing python callable functions foo and cat

        # if foo requires return value from another task 
        # through xcom_pull(task_ids='bar') 
        self.xcom.args(5, task_ids = 'bar')

        # if foo requires xcom_pull(task_ids='bar', key='joe')
        self.xcom.args(7, task_ids = 'bar', key = 'joe')

        result = foo(**self.context)
        # test the result e.g
        self.assertTrue(result)

        # if python callable e.g cat, needs to be chained and
        # needs xcom_push values from foo
        self.xcom.xcom_push(self.context['ti'].xcom_push.call_args_list,'c')
        # and to get the return value using xcom_pull(task_ids='foo')
        self.xcom.args(result, task_ids='foo')

        result = cat(**self.context)
        self.assertTrue(result)

下面是 XCom 类

class XCom:
    """mocks xcom"""

    def __init__(self):
        self.map=[]
        self.last_args_index = 0

    def side_effect(self, *args, **kwargs):
        """mock xcom_pull"""

        for result, sig in self.map:
            if sig == kwargs:
                return result
            
        raise Exception('signature not found')
    
    def args(self, result, **kwargs):
        """appending data to simulate xcom_pull"""

        self.map.append((result, kwargs))

    def xcom_push(self, call_args_list, task):
        """saving xcom_push data"""

        for args in call_args_list[self.last_args_index:len(call_args_list)]:
            result = args.kwargs.pop('value')
            kwargs = dict()
            kwargs['task_ids'] = task
            kwargs.update(args.kwargs)
            self.args(result, **kwargs)
            self.last_args_index += 1
© www.soinside.com 2019 - 2024. All rights reserved.