如何使用SimpleHttpOperator读取上一条消息的XCom,然后决定在Airflow中执行任务2。
假设我有3个SimpleHttpsOperator任务,所有任务都返回一个XCom消息,在XCom值中它根据结果返回成功或失败。
所以在执行t2之前,我想检查t1是否成功。我的所有任务都使用SimpleHttpsOperator
t1 >> t2 >> tz
下面是我的代码的片段:
t1 = SimpleHttpOperator(
task_id='t1',
http_conn_id='http_temp',
endpoint='update_data',
method='POST',
headers={"Content-Type":"application/json"},
xcom_push=True,
log_response=True,
dag=dag,
)
t2 = SimpleHttpOperator(
task_id='t2',
http_conn_id='http_temp',
endpoint='update_data',
method='POST',
headers={"Content-Type":"application/json"},
# response_check=lambda response: True if len(response.json()) == 0 else False,
xcom_push=True,
log_response=True,
dag=dag,
你将不得不使用BranchPythonOperator。下面的依赖链中的check_t1_status
和check_t2_status
将使用BranchPythonOperator
来检查使用xcom的上一个任务的输出。并根据输出,执行下一个任务或在前一个任务失败时运行虚尾。
t1 >> check_t1_status >> t2 >> check_t2_status >> t3
check_t1_status >> t1_fail
check_t2_status >> t2_fail