我有一个如下所示的功能:
def my_funct(configuration, **context):
my_file_path = os.path.join(FILE_PATH, configuration["id"])
f = NamedTemporaryFile("w", delete=False, dir=my_file_path)
formatted_data_output_file = f.name
f.close()
task_instance.xcom_push(key="formatted_data_output_file",value=formatted_data_output_file)
我正在尝试测试 xcom_push 键/值,但我不知道如何模拟 NamedTemporaryFile
我有以下测试:
@patch("my_script.NamedTemporaryFile")
def test_xcom_push_vlaue(self, mock_tmp_file):
dag = DAG(dag_id="mock_dag", start_date=now())
task = DummyOperator(
dag=dag,
task_id="test_task",
)
mock_task_instance = TaskInstance(
task,
execution_date=now(),
)
context = {
"dag": dag,
"task_instance": mock_task_instance,
}
mock_tmp_file.return_value.__enter__.return_value.name = "name"
with patch.object(
mock_task_instance, "xcom_pull", side_effect=[self.configuration]
):
with patch.object(mock_task_instance, "xcom_push") as mock_xcom_push:
my_funct(configuration = {}, **context)
mock_xcom_push.assert_called_once_with(
key="formatted_data_output_file",
value="/my/file/path/%s" % self.configuration["id"] + "/name"
)
运行测试时出现以下错误:
TypeError: expected str, bytes or os.PathLike object, not MagicMock
我是嘲笑新手,无法弄清楚,任何帮助将不胜感激。
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
p = <MagicMock name='NamedTemporaryFile().name'id='139929782915024'>
def dirname(p):"""Returns the directory component of a pathname"""
> p = os.fspath(p)
E TypeError: expected str, bytes or os.PathLike object, not MagicMock
/usr/local/lib/python3.7/posixpath.py:156: TypeError
-------------- generated xml file: /opt/airflow/test_results.xml ---------------
你为什么要修补
__enter__
?您没有使用 with
语句。
尝试更换这一行:
mock_tmp_file.return_value.__enter__.return_value.name = "name"
有了这个:
mock_tmp_file.return_value.name = "name"