在不使用装饰器的情况下在任务组上进行任务映射

问题描述 投票:0回答:1

我已经实现了一个任务组,预计可以在多个 DAG 之间重用,其中以映射方式利用它更有意义。这是我的任务组的完整代码:

from airflow.utils.task_group import TaskGroup
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.email import EmailOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook, SSHHook

def DeliveryGroup(group_id: str, file:str, deliver_args:dict, **kwargs) -> TaskGroup:
    with TaskGroup(group_id=group_id, **kwargs) as tg:
        # select destination type
        selector_task = BranchPythonOperator(
            task_id='destination_selector',
            python_callable=lambda: f"{deliver_args.get('type')}"
        )
        
        email_task = EmailOperator(
            task_id="email",
            to=deliver_args.get('to'),
            subject=deliver_args.get('subject'),
            cc=deliver_args.get('cc'),
            html_content=deliver_args.get('body'),
            files=[file]
        )

        sftp_task = SFTPOperator(
            task_id="sftp",
            # ssh_conn_id='YinzCam-Connection',
            sftp_hook=SFTPHook(
                remote_host=deliver_args.get('host'),
                username=deliver_args.get('username'),
                password=deliver_args.get('password'),
                port=deliver_args.get('port', 22)),
            local_filepath=[file],
            remote_filepath=[deliver_args.get('path')]
        )

        selector_task >> [email_task, sftp_task]

        return tg

接下来我要做的是将代表不同目的地的字典列表作为该任务组的扩展参数传递。

task3 = DeliveryGroup.partial(
                group_id='deliver',
                file = "my_file.csv",
            ).expand(
                args=dag.default_args.get('destinations') # a list of dicts
            )

但是,我收到了这个错误:

AttributeError: 'function' object has no attribute 'partial'
。那么在不使用装饰器的情况下编写任务组映射的正确方法是什么?

语法指南、参考文献

airflow operators airflow-2.x airflow-taskflow
1个回答
1
投票

问题:

DeliveryGroup
是您案例中的一个函数(不是
instance
,不是
module
等)

所以它是这样的:

def my_func():
    return True

# my_func is a function. the function knows nothing about partial()
my_func.partial()  # AttributeError: 'function' object has no attribute 'partial'

如果我正确理解你需要什么,这里有一个例子:

def create_group(group_id: str, file: str, task_params: list[dict], group_params: dict = None) -> TaskGroup:
    group_params = group_params or {}
    
    with TaskGroup(group_id=group_id, **group_params) as group:
        # iterate through the task parameters, create operators dynamically and attach to the group
        for ix, params in enumerate(task_params):
            selector_task = BranchPythonOperator(
                task_id=f'destination_selector_{ix}',
                python_callable=lambda: f"{params.get('type')}"
            )

            email_task = EmailOperator(
                task_id=f'email_{ix}',
                to=params.get('to'),
                subject=params.get('subject'),
                cc=params.get('cc'),
                html_content=params.get('body'),
                files=[file],
            )

            sftp_task = SFTPOperator(
                task_id=f'sftp_{ix}',
                # ssh_conn_id='YinzCam-Connection',
                sftp_hook=SFTPHook(
                    remote_host=params.get('host'),
                    username=params.get('username'),
                    password=params.get('password'),
                    port=params.get('port', 22),
                ),
                local_filepath=[file],
                remote_filepath=[params.get('path')],
            )

            selector_task >> [email_task, sftp_task]

        return group


deliver = create_group(
    group_id='deliver',
    file='my_file.csv',
    group_params=dict(ui_fgcolor='#E4BE4D'),
    task_params=[
        dict(
            type='test',
            to=['[email protected]'],
            cc=['[email protected]'],
            body='hello world',
            host='host.so.com',
            username='username1',
            password='password1',
        ),
        dict(
            type='test2',
            to=['[email protected]'],
            cc=['[email protected]'],
            body='hello world2',
            host='host2.so.com',
            username='username2',
            password='password2',
        ),
        # ...
    ]
)
© www.soinside.com 2019 - 2024. All rights reserved.