我正在尝试为使用 Airflow TaskFlow API 构建的一些任务编写单元测试。我尝试了多种方法,例如,通过创建 dagrun 或仅运行任务函数,但没有任何帮助。
这是我从 S3 下载文件的任务,还有更多内容,但我在本示例中删除了它。
@task()
def updates_process(files):
context = get_current_context()
try:
updates_file_path = utils.download_file_from_s3_bucket(files.get("updates_file"))
except FileNotFoundError as e:
log.error(e)
return
# Do something else
现在我正在尝试编写一个测试用例,我可以在其中检查这个 except 子句。以下是我开始的例子
class TestAccountLinkUpdatesProcess(TestCase):
@mock.patch("dags.delta_load.updates.log")
@mock.patch("dags.delta_load.updates.get_current_context")
@mock.patch("dags.delta_load.updates.utils.download_file_from_s3_bucket")
def test_file_not_found_error(self, download_file_from_s3_bucket, get_current_context, log):
download_file_from_s3_bucket.side_effect = FileNotFoundError
task = account_link_updates_process({"updates_file": "path/to/file.csv"})
get_current_context.assert_called_once()
log.error.assert_called_once()
我还尝试创建一个 dagrun,如文档中的示例所示,并从 dagrun 中获取任务,但这也没有帮助。
我自己很难做到这一点,但我发现装饰任务有一个
.function
参数。
然后您可以使用
Task.function()
来调用实际函数。用你的例子:
class TestAccountLinkUpdatesProcess(TestCase):
@mock.patch("dags.delta_load.updates.log")
@mock.patch("dags.delta_load.updates.get_current_context")
@mock.patch("dags.delta_load.updates.utils.download_file_from_s3_bucket")
def test_file_not_found_error(self, download_file_from_s3_bucket, get_current_context, log):
download_file_from_s3_bucket.side_effect = FileNotFoundError
task = dags.delta_load.updates.updates_process
# Call the function for testing
task.function({"updates_file": "path/to/file.csv"})
get_current_context.assert_called_once()
log.error.assert_called_once()
这使您无需设置任何 DAG 基础设施,只需按预期运行 python 函数即可!
这是我能弄清楚的。不确定这是否正确,但它有效。
class TestAccountLinkUpdatesProcess(TestCase):
TASK_ID = "updates_process"
@classmethod
def setUpClass(cls) -> None:
cls.dag = dag_delta_load()
@mock.patch("dags.delta_load.updates.log")
@mock.patch("dags.delta_load.updates.get_current_context")
@mock.patch("dags.delta_load.updates.utils.download_file_from_s3_bucket")
def test_file_not_found_error(self, download_file_from_s3_bucket, get_current_context, log):
download_file_from_s3_bucket.side_effect = FileNotFoundError
task = self.dag.get_task(task_id=self.TASK_ID)
task.op_args = [{"updates_file": "file.csv"}]
task.execute(context={})
log.error.assert_called_once()
更新:根据@AetherUnbound的回答,我做了一些调查,发现我们可以使用
task.__wrapped__()
来调用实际的python函数。
class TestAccountLinkUpdatesProcess(TestCase):
@mock.patch("dags.delta_load.updates.log")
@mock.patch("dags.delta_load.updates.get_current_context")
@mock.patch("dags.delta_load.updates.utils.download_file_from_s3_bucket")
def test_file_not_found_error(self, download_file_from_s3_bucket, get_current_context, log):
download_file_from_s3_bucket.side_effect = FileNotFoundError
update_process.__wrapped__({"updates_file": "file.csv"})
log.error.assert_called_once()