我正在尝试如下嘲笑
airflow.operators.python.get_current_context
:
@pytest.fixture
def _mock_get_current_context(mocker):
mocker.patch(
"airflow.operators.python.get_current_context",
return_value={},
)
此模式适用于我正在嘲笑的所有其他函数,例如
requests.get
、airflow.operators.trigger_dagrun.TriggerDagRunOperator.execute
和 requests.Response.content
。但是,当我在 DAG 任务中调用 get_current_context()
时,出现以下错误:
if not _CURRENT_CONTEXT:
raise AirflowException(
"Current context was requested but no context was found! "
"Are you running within an airflow task?"
)
E airflow.exceptions.AirflowException: Current context was requested but no
context was found! Are you running within an airflow task?
表明模拟不起作用,因为
get_current_context()
的源代码如下所示:
def get_current_context() -> Context:
if not _CURRENT_CONTEXT:
raise AirflowException(
"Current context was requested but no context was found! "
"Are you running within an airflow task?"
)
return _CURRENT_CONTEXT[-1]
有什么想法可能出了问题吗?
这有点棘手(对于 Airflow)。正如其他人指出的那样,重要的是要认识到“模拟是如何工作的”。但对于 Airflow 任务,情况就更加复杂了。 让我们举个例子 - 您有一些存储库
custom_repo
以及包含您的模块
daily
的文件夹 dag.py
。在这个模块中,有所有任务、DAG 定义以及顶级导入from airflow.operators.python import get_current_context
。然后,您的 custom_repo
中还有一个测试文件夹,您可以在其中测试您的任务并模拟上下文。你的方法行不通,所以你可以考虑使用类似的东西:
@pytest.fixture
def _mock_get_current_context(mocker):
mocker.patch(
"custom_repo.daily.dag.get_current_context",
return_value={},
)
但是这不会起作用,因为您不直接从
custom_repo
导入任何内容,您很可能使用
DagBag
,它允许您在存储库中发现您的dags并通过dag id(和任务id)访问不同的任务。但是所有 dag 都带有一个随机名称,这使得模拟变得混乱(在我的例子中,dag 模块名称类似于 unusual_prefix_91538somerandomid_dag
)。因此我建议使用这样的东西(注意_CURRENT_CONTEXT
是包含上下文字典的列表):
@pytest.fixture
def _mock_get_current_context(mocker):
mocker.patch.object(
airflow.operators.python,
"_CURRENT_CONTEXT",
[{}],
)
可能有更好(更正确?)的方法,但我不知道。