气流 - 名称错误:名称“ti”未定义

问题描述 投票:0回答:2

我正在尝试执行一个由几个函数组成的 Airflow 脚本。我想将“program_no”的值作为 Spark 提交请求中的参数传递,我通过 get_conf 方法中的上下文从 api 调用中获取该请求。我试图像 {ti.xcom_pull(task_ids='parameterized_task')} 一样传递,但收到错误 - NameError: name 'ti' is not Defined。请帮忙如何解决这个问题?
我还尝试传递 {prog_no} 而不是 {ti.xcom_pull(task_ids='parameterized_task')} 但收到相同的错误 - prog_no 未定义

dag = DAG(
    dag_id=os.path.basename(__file__).replace(".py", ""),
    default_args=default_args,
    start_date=datetime(2023, 12, 21),
    schedule_interval=None,
    description='Event based job for calculating missed sales based allowances for retroactive program setup'
)

def get_conf(**context):
    global prog_no
    #ti = context['ti']
    prog_no = context['dag_run'].conf['program_no']
    return prog_no

parameterized_task = PythonOperator(
        task_id="parameterized_task",
        python_callable=get_conf,
        provide_context=True,
        dag=dag
    )    

sparkway_request = SimpleHttpOperator(
        task_id='sparkway_request',
        endpoint=Variable.get('sparkway_api_endpoint'),
        method="POST",
        http_conn_id="SPARKWAY_CONN",
        data=json.dumps({
            "cmd": "sparkway-submit --master kubernetes --job-name spark-allowance-calculation --class com.xxx.CalculationApplication --spark-app s3a://xyz.jar --arguments SuspendedProgramStatus --num-executors 2 --executor-cores 2 --executor-memory 3G --driver-memory 3G",
            "arguments": f"{dag.latest_execution_date},{ti.xcom_pull(task_ids='parameterized_task')}",
            "type": "job"
        }),
        headers={
            "Authorization": f"Bearer {Variable.get('sparkway_token')}",
            "Content-Type": "application/json",
            "X-CSRF-TOKEN": Variable.get('sparkway_csrf_token')
        },
        response_check=lambda response: handle_sparkway_response(response),
        log_response=True,
        dag=dag
    )

参数化_任务>>sparkway_request

python airflow spark-submit
2个回答
0
投票

ti
将不会被定义,因为它只能使用 jinja 模板来访问。我不确定
SimpleHttpOperator
是否支持这个,但你可以尝试这个

    data=json.dumps({
        "cmd": "sparkway-submit ........",
        "arguments": "{{ execution_date }},{{ task_instance.xcom_pull(task_ids='parameterized_task', key='program_no') }}",
        "type": "job"
    })

0
投票

通过这种方式访问 jinja 模板中的 ti 应该可以工作:

        data=json.dumps({
            "cmd": "sparkway-submit --master kubernetes --job-name spark-allowance-calculation --class com.xxx.CalculationApplication --spark-app s3a://xyz.jar --arguments SuspendedProgramStatus --num-executors 2 --executor-cores 2 --executor-memory 3G --driver-memory 3G",
            "arguments": f"{dag.latest_execution_date}, {{ ti.xcom_pull(task_ids=["parameterized_task"][0]) }}",
            "type": "job"
        }),

© www.soinside.com 2019 - 2024. All rights reserved.