我有一个气流管道,可以对 GCP cloud sql 上的 postgresql 数据库执行 UPSERT。现在,在我在本地数据库上使用 PostgresOperator 之前一切都工作正常,但需要使用云 sql 将我的 DAG 执行移动到 Cloud Composer。我遵循了一组相当复杂的步骤,以允许通过 Cloud Composer 的 kubernetes 集群中的 pod 连接到我的数据库。我可以确认这是有效的!我有一些操作员成功在数据库中创建表。不过,当我尝试从 CloudSqlExecuteQueryOperator 取回值时,我的问题出现了。
我有以下模板化 SQL 命令...
WITH input_rows(plate_name, plate_type, number_wells) AS (
VALUES
{% for row in ti.xcom_pull(task_ids='get_message_rows', key='return_value') %}
('{{ row['plate'] }}', 'culture', 384) {{ ",\n" if not loop.last else "" }}
{% endfor %}
)
, ins AS (
INSERT INTO plate (plate_name, plate_type, number_wells)
SELECT * FROM input_rows
ON CONFLICT (plate_name) DO NOTHING
RETURNING plate_id, plate_name, plate_type, number_wells
)
SELECT 'i' AS source
, plate_id, plate_name, plate_type, number_wells
FROM ins
UNION ALL
SELECT 's' AS source
, c.plate_id, input_rows.plate_name, input_rows.plate_type, input_rows.number_wells
FROM input_rows
JOIN plate c USING (plate_name);
有问题的运算符会像这样执行...
write_plates = CloudSQLExecuteQueryOperator(
task_id="write_plates",
sql=sql_plate_template,
gcp_cloudsql_conn_id=connection_name, # environment variable AIRFLOW_CONN_<conn_id>
)
# this operator fails because it is getting a NoneType
formatted_results = some_python_operator_with_decoration(write_plates.output)
write_plates >> formatted_results
我看到表中插入的行,当我手动运行此查询时,我得到了预期的返回值。 CloudSqlExecuteQueryOperator 是否根本没有能力返回到 XCOM?我应该将自动提交设置为 true 吗?我需要做些什么吗?我希望得到类似于 PostgresOperator 的返回,您可以像这样迭代行和列......
@task
def some_python_operator_with_decoration(postgres_results)
results = []
for row in postgres_results:
results.append({'source': row[0], "id": row[1], "name": row[2]})
return results
总长:
不要尝试与运行 cloud_sql_proxy 的 pod 建立数据库连接。要么尝试建立直接连接,要么在没有得到任何返回值的情况下完成。
长版:
所以文档说 CloudSqlExecuteQueryOperator 不返回任何内容。这是一个巨大的失望,因为我依赖于从数据库中获取递增的 ID。
接下来,我开始尝试是否可以使用 postgresOperator 连接到云 SQL。在遵循中等文章(https://medium.com/nerd-for-tech/connecting-gcp-composer-to-cloud-sql-via-proxy-305743a388a)中的大部分步骤后,我部署了一个 Pod正在运行 cloud_sql_proxy。我将此工作负载公开为 kubernetes 集群中的服务(集群 IP)。
在 Airflow 中,我转到管理 > 连接并尝试添加 postgres 连接。我尝试提供 cloud_sql_proxy 服务的集群 IP 和端口,但令我失望的是,“service_name.namespace.svc.cluster.local”或集群 IP 的任何变化似乎都不起作用。不过我可以看到该 Pod 工作正常。我可以通过笔记本电脑上的这个 Pod 连接到数据库。所以我得出的结论是正在运行的气流容器有问题。我仍然不知道它是什么,但我无法摆弄它,因为它是谷歌的作曲家环境......
这导致了我的工作。我最终使用 python 中的 uuid 库生成唯一 ID 并将它们输入到数据库中,而不需要它返回 ID。我必须更改我的数据表,因此它们标准化程度较低,但是嘿,它有效。