Airflow BigQueryInsertJobOperator 手动调用时不会渲染 jinja2 模板文件

问题描述 投票:0回答:1

我尝试在 PythonOperator 中手动调用 BigQueryInsertJobOperator,如下所示,但未生成 SQL 模板:

def some_func(**kwargs):
# ...
append_tmp_to_raw_and_clean_after = BigQueryInsertJobOperator(
    task_id="append_tmp_to_raw",
    gcp_conn_id=GCP_CONN_ID,
    configuration={
        "query": {
            "query": '{% include "sql/merge_stage_raw_vehicle_position.sql" %}',
            "useLegacySql": False,
        }
    },
    params=params,
)
append_tmp_to_raw_and_clean_after.execute(context=kwargs)
# ....

如果我将它用作 DAG 的一部分,它可以正常工作,但是当手动调用运算符时,它会创建一个字面意思为“{% include "sql/merge_stage_raw_vehicle_position.sql" %}”的查询(根据 BigQuery 日志)。这意味着模板渲染尚未完成。

我想也许我需要调用一些方法来显式渲染 SQL,但是在查看 BigQueryInsertJobOperator 的源代码后,我只找到了“prepare_template”,并且它没有使用 Jinja2 等

在这一点上,我认为 Airflow 作为 DAG 的一部分发挥了一些魔力来渲染 Jinja 模板并替换 SQL 查询,但因为我在 PythonOperator 中手动执行此操作,所以它不起作用。

我知道我可以手动渲染 Jinja2 模板并将其作为字符串传递,但我很好奇我是否忘记调用一些未知的方法来按原样渲染 SQL。这将是理想的。

非常感谢。

气流版本:2.8.0

google-bigquery airflow
1个回答
0
投票

我认为 Airflow 作为 DAG 的一部分发挥了一些魔力,可以渲染 Jinja 模板并替换 SQL 查询

这是在工作人员上启动任务以执行的顺序。步骤之一是渲染 Jinja 字符串。 运算符内运算符的模式是不好的做法。类名被称为“operator”这一事实只是一种约定。您还可以创建名为 Foo 的运算符:

Class Foo(BaseOperator):
   ...

启动一个任务的顺序有很多步骤,当你在operator内部调用operator时,你需要自己处理所有这些步骤,因为Airflow不知道内部类是operator。调度程序不会查看 Python 可调用的内部。 我在https://stackoverflow.com/a/65168908/14624409

中更详细地解释了它

您没有解释您想要什么,但以下之一可能是避免不良模式的替代方法:

  1. 使用PythonOperator内的相关钩子来满足功能需求。
  2. 创建自定义运算符来端到端处理所有所需的功能(从而避免需要用
    BigQueryInsertJobOperator
    包装
    PythonOperator

Airflow 社区正在讨论如何解决这个问题(请参阅邮件列表主题)我已经在主题中提出了这个用例,并指出用户期望操作符内部操作符的模式能够简单地开箱即用。现在说会产生什么结果还为时过早,但目前我建议的替代方案之一应该可以正常工作。

© www.soinside.com 2019 - 2024. All rights reserved.