CloudSqlExecuteQueryOperator 不向 xcom 返回结果

问题描述 投票:0回答:1

我有一个气流管道,可以对 GCP cloud sql 上的 postgresql 数据库执行 UPSERT。现在,在我在本地数据库上使用 PostgresOperator 之前一切都工作正常,但需要使用云 sql 将我的 DAG 执行移动到 Cloud Composer。我遵循了一组相当复杂的步骤,以允许通过 Cloud Composer 的 kubernetes 集群中的 pod 连接到我的数据库。我可以确认这是有效的!我有一些操作员成功在数据库中创建表。不过,当我尝试从 CloudSqlExecuteQueryOperator 取回值时,我的问题出现了。

我有以下模板化 SQL 命令...

WITH input_rows(plate_name, plate_type, number_wells) AS (
  VALUES
    {% for row in ti.xcom_pull(task_ids='get_message_rows', key='return_value') %}
    ('{{ row['plate'] }}', 'culture', 384) {{ ",\n" if not loop.last else "" }}
    {% endfor %}
    )
, ins AS (
  INSERT INTO plate (plate_name, plate_type, number_wells)
  SELECT * FROM input_rows
  ON CONFLICT (plate_name) DO NOTHING
  RETURNING plate_id, plate_name, plate_type, number_wells
  )
SELECT 'i' AS source
    , plate_id, plate_name, plate_type, number_wells
FROM ins
UNION ALL
SELECT 's' AS source
    , c.plate_id, input_rows.plate_name, input_rows.plate_type, input_rows.number_wells
FROM input_rows
JOIN plate c USING (plate_name);

有问题的运算符会像这样执行...

write_plates = CloudSQLExecuteQueryOperator(
        task_id="write_plates",
        sql=sql_plate_template,
        gcp_cloudsql_conn_id=connection_name, # environment variable AIRFLOW_CONN_<conn_id>
    )

# this operator fails because it is getting a NoneType
formatted_results = some_python_operator_with_decoration(write_plates.output)

write_plates >> formatted_results

我看到表中插入的行,当我手动运行此查询时,我得到了预期的返回值。 CloudSqlExecuteQueryOperator 是否根本没有能力返回到 XCOM?我应该将自动提交设置为 true 吗?我需要做些什么吗?我希望得到类似于 PostgresOperator 的返回,您可以像这样迭代行和列......

@task
def some_python_operator_with_decoration(postgres_results)
    results = []
    for row in postgres_results:
        results.append({'source': row[0], "id": row[1], "name": row[2]})
    return results
airflow google-cloud-sql google-cloud-composer
1个回答
0
投票

总长:

不要尝试与运行 cloud_sql_proxy 的 pod 建立数据库连接。要么尝试建立直接连接,要么在没有得到任何返回值的情况下完成。

长版:

所以文档说 CloudSqlExecuteQueryOperator 不返回任何内容。这是一个巨大的失望,因为我依赖于从数据库中获取递增的 ID。

接下来,我开始尝试是否可以使用 postgresOperator 连接到云 SQL。在遵循中等文章(https://medium.com/nerd-for-tech/connecting-gcp-composer-to-cloud-sql-via-proxy-305743a388a)中的大部分步骤后,我部署了一个 Pod正在运行 cloud_sql_proxy。我将此工作负载公开为 kubernetes 集群中的服务(集群 IP)。

在 Airflow 中,我转到管理 > 连接并尝试添加 postgres 连接。我尝试提供 cloud_sql_proxy 服务的集群 IP 和端口,但令我失望的是,“service_name.namespace.svc.cluster.local”或集群 IP 的任何变化似乎都不起作用。不过我可以看到该 Pod 工作正常。我可以通过笔记本电脑上的这个 Pod 连接到数据库。所以我得出的结论是正在运行的气流容器有问题。我仍然不知道它是什么,但我无法摆弄它,因为它是谷歌的作曲家环境......

这导致了我的工作。我最终使用 python 中的 uuid 库生成唯一 ID 并将它们输入到数据库中,而不需要它返回 ID。我必须更改我的数据表,因此它们标准化程度较低,但是嘿,它有效。

© www.soinside.com 2019 - 2024. All rights reserved.