从 Beam 管道连接 google cloud sql postgres 实例时出现问题

问题描述 投票:0回答:2

我在连接 Google Cloud SQL 上的 Postgresql 实例时遇到一些问题,想要寻求帮助。我不确定解决方案是否是启动连接引擎或类似的东西,但这是我的问题。我的代码如下

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

source_config = relational_db.SourceConfiguration(
    drivername='postgresql+pg8000',
    host='localhost',
    port=5432,
    username= USERNAME,
    password= PASSWORD,
    database= DB-NAME,
    create_if_missing=True,
)

table_config = relational_db.TableConfiguration(
            name=TABLE-NAME,
            create_if_missing=False,
            primary_key_columns=["key"],
            create_insert_f=FUNCTION,
        )

with beam.Pipeline(options= pipeline_options) as pipeline:    
update_pipe= (
    pipeline
    | 'QueryTable' >> beam.io.ReadFromBigQuery(
        table= TABLE)
    | 'UPDATE DB' >> relational_db.Write(source_config=source_config, table_config=table_config)
)

运行这样的代码会导致以下错误:

RuntimeError: sqlalchemy.exc.InterfaceError: (pg8000.exceptions.InterfaceError) Can't create a connection to host localhost and port 5432 (timeout is None and source_address is None).

我已经阅读了 stackoverflow 上的文档和一些相关问题,其中我看到了一些建议,例如私有 IP 连接,或尝试使用 Gcloud CLI 进行身份验证,以及其他此类内容。但我的困惑与以下内容有关。如果我使用 sqlalchemy 而不尝试在 apache beam 管道中实现它,我不会得到相同的连接拒绝。其中一个参数明确定义要查找的 IP 类型为 Public

import sys  
import sqlalchemy
from google.cloud.sql.connector import Connector, IPTypes


# initialize Python Connector object
connector = Connector()

# Python Connector database connection function
def getconn():
    conn = connector.connect(
        CLOUD-SQL-CONNECTION-NAME, # Cloud SQL Instance Connection Name
        "pg8000",
        user=USERNAME,
        password=PASSWORD,
        db=DB-NAME,
        ip_type= IPTypes.PUBLIC  # IPTypes.PRIVATE for private IP
    )
    return conn

# create connection pool with 'creator' argument to our connection object function
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

# interact with Cloud SQL database using connection pool
with pool.connect() as db_conn:
   
    result = db_conn.execute(sqlalchemy.text("SELECT * from users_copy LIMIT 10")).fetchall()

所以我的问题是,有没有办法将连接/引擎包含在 Beam 管道中以避免错误?或者我是否需要更改源配置参数以包含 Cloud SQL 实例连接名称?

感谢您的帮助和阅读我的问题。

python postgresql apache-beam google-cloud-sql
2个回答
0
投票

我也有同样的问题。你能解决吗?


0
投票

您是否已锁定使用

beam_nuggets.io
库?

如果是,那么遗憾的是您不能使用 Cloud SQL Python Connector

看一下 beam-nuggets 代码,它在底层使用了 SQLAlchemy,可供 Python 连接器使用,但它纯粹使用数据库 URL/URI 来配置数据库引擎。 Cloud SQL Python 连接器需要利用 SQLAlchemy 的

create_engine
方法和
creator
参数。 Beam 块必须添加对
creator
参数的支持才能支持 Cloud SQL Python 连接器。

Cloud SQL Python 连接器的替代方案是尝试将 Cloud SQL 代理 与您的 Beam 应用程序一起部署,以便您的初始代码片段可以连接到

localhost

© www.soinside.com 2019 - 2024. All rights reserved.