我已经实现了一个任务组,预计可以在多个 DAG 之间重用,其中以映射方式利用它更有意义。这是我的任务组的完整代码:
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.email import EmailOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook, SSHHook
def DeliveryGroup(group_id: str, file:str, deliver_args:dict, **kwargs) -> TaskGroup:
with TaskGroup(group_id=group_id, **kwargs) as tg:
# select destination type
selector_task = BranchPythonOperator(
task_id='destination_selector',
python_callable=lambda: f"{deliver_args.get('type')}"
)
email_task = EmailOperator(
task_id="email",
to=deliver_args.get('to'),
subject=deliver_args.get('subject'),
cc=deliver_args.get('cc'),
html_content=deliver_args.get('body'),
files=[file]
)
sftp_task = SFTPOperator(
task_id="sftp",
# ssh_conn_id='YinzCam-Connection',
sftp_hook=SFTPHook(
remote_host=deliver_args.get('host'),
username=deliver_args.get('username'),
password=deliver_args.get('password'),
port=deliver_args.get('port', 22)),
local_filepath=[file],
remote_filepath=[deliver_args.get('path')]
)
selector_task >> [email_task, sftp_task]
return tg
接下来我要做的是将代表不同目的地的字典列表作为该任务组的扩展参数传递。
task3 = DeliveryGroup.partial(
group_id='deliver',
file = "my_file.csv",
).expand(
args=dag.default_args.get('destinations') # a list of dicts
)
但是,我收到了这个错误:
AttributeError: 'function' object has no attribute 'partial'
。那么在不使用装饰器的情况下编写任务组映射的正确方法是什么?
语法指南、参考文献
问题:
是您案例中的一个函数(不是DeliveryGroup
,不是instance
等)module
所以它是这样的:
def my_func():
return True
# my_func is a function. the function knows nothing about partial()
my_func.partial() # AttributeError: 'function' object has no attribute 'partial'
如果我正确理解你需要什么,这里有一个例子:
def create_group(group_id: str, file: str, task_params: list[dict], group_params: dict = None) -> TaskGroup:
group_params = group_params or {}
with TaskGroup(group_id=group_id, **group_params) as group:
# iterate through the task parameters, create operators dynamically and attach to the group
for ix, params in enumerate(task_params):
selector_task = BranchPythonOperator(
task_id=f'destination_selector_{ix}',
python_callable=lambda: f"{params.get('type')}"
)
email_task = EmailOperator(
task_id=f'email_{ix}',
to=params.get('to'),
subject=params.get('subject'),
cc=params.get('cc'),
html_content=params.get('body'),
files=[file],
)
sftp_task = SFTPOperator(
task_id=f'sftp_{ix}',
# ssh_conn_id='YinzCam-Connection',
sftp_hook=SFTPHook(
remote_host=params.get('host'),
username=params.get('username'),
password=params.get('password'),
port=params.get('port', 22),
),
local_filepath=[file],
remote_filepath=[params.get('path')],
)
selector_task >> [email_task, sftp_task]
return group
deliver = create_group(
group_id='deliver',
file='my_file.csv',
group_params=dict(ui_fgcolor='#E4BE4D'),
task_params=[
dict(
type='test',
to=['[email protected]'],
cc=['[email protected]'],
body='hello world',
host='host.so.com',
username='username1',
password='password1',
),
dict(
type='test2',
to=['[email protected]'],
cc=['[email protected]'],
body='hello world2',
host='host2.so.com',
username='username2',
password='password2',
),
# ...
]
)