如何使用 AWS 联合令牌 json 连接到 Airflow 中的 Bigquery?

问题描述 投票:0回答:1

我需要构建一个从 google bigquery 读取的气流连接器,尽管托管在 aws 上。 我的 DevOps 团队创建了一个秘密 json,格式为:

{
  "type": "external_account",
  "audience": xx,
  "subject_token_type": xx,
  "service_account_impersonation_url": xx,
  "token_uri": xx,
  "credential_source": {xx, xx, xx}
}

我认为对应于https://google.aip.dev/auth/4117 他们已将秘密作为参数上传到 SSM,我尝试使用 SsmHook.getparameter_value() 提取该秘密。这可能有效,也可能无效,但我不知道如何使用它来设置似乎需要以另一种方式进行身份验证的 BigQueryGetDataOperator。 如果我只是简单地提取秘密并运行操作员,我的反应是:

airflow.exceptions.AirflowNotFoundException: The conn_id `google_cloud_default` isn't defined

考虑到这些基础设施限制,有人可以建议如何最好地继续吗?谢谢

from datetime import datetime
import pandas as pd
from google.cloud import bigquery
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDataOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.hooks.ssm import SsmHook

PROJECT = "xxx"
DATASET = "xxx"
TABLE = "xxx"


def get_ssm_parameter(parameter_name, with_decryption=True):
    hook = SsmHook()
    return hook.get_parameter_value(parameter_name, with_decryption)


def get_token():
    return get_ssm_parameter(
        parameter_name="/prod/xxx/airflow", with_decryption=True
    )


with DAG(
    "example-dag",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    start = EmptyOperator(task_id="start")
    t1 = PythonOperator(
        task_id="read-bigquery",
        python_callable=get_token,
    )
    get_data = BigQueryGetDataOperator(
        task_id="get_data",
        dataset_id=DATASET,
        table_id=TABLE,
        max_results=10,
    )
    end = EmptyOperator(task_id="end")
    start >> t1 >> get_data >> end

python amazon-web-services google-bigquery airflow federated
1个回答
0
投票

因此,使用 AWS 联合令牌进行 BigQuery 访问时,您无法在 Airflow 的 UI 内部创建连接,因此不能使用 BigQueryGetDataOperator。我发现如上所述,您可以使用钩子从 ssm 检索令牌,然后将其设置为环境变量,您需要将其写入临时本地文件:

token = arbitrary_get_token_func()
with open("/temp/token.json", "w") as f:
    f.write(token)
# this needs to be a path, not the file itself
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/tmp/token.json" 
os.environ["GOOGLE_CLOUD_PROJECT"] = "{project-name}"

client = bigquery.Client()
rows = client.list_rows(f"{PROJECT}.{DATASET}.{TABLE}", max_results=xx) # to limit result size
return rows.to_dataframe() # or to_arrow() etc

© www.soinside.com 2019 - 2024. All rights reserved.