在dockerized apache airflow python运算符内部以编程方式创建SSH隧道

问题描述 投票:1回答:1

当我的Docker容器内部运行apache airflow时,我的程序无法创建SSH隧道。仅在本地计算机上运行该功能可以正常工作。我有一个服务器列表,可用于创建隧道,查询数据库并关闭连接。通常,我会通过以下方式进行操作:

for server in servers:
    server_conn = sshtunnel.SSHTunnelForwarder(
        server,
        ssh_username=ssh_user,
        ssh_password=ssh_password,
        remote_bind_address=(localhost, db_port),
        local_bind_address=(localhost, localport)
    )

这可以按预期工作,我可以在那里做我需要做的任何事情。但是,在Docker中,它不起作用。我意识到docker运行并绑定到端口,并且实际上不在主机系统之外,因此我使用network_mode="host"来缓解此问题。但是,这不起作用,因为我的容器失去了相互通信的能力。这是我的docker-compose文件

    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow
            - PGDATA=/var/lib/postgresql/data/pgdata
        volumes:
            - ~/.whale/pgdata:/var/lib/postgresql/data/pgdata
            - ./dags/docker/sql/create.sql:/docker-entrypoint-initdb.d/init.sql
        ports:
          - "5432:5432"

    webserver:
        image: hawk
        build:
            context: .
            dockerfile: ./dags/docker/Dockerfile-airflow
        restart: always
        depends_on:
            - postgres
            # - redis
        environment:
            - LOAD_EX=n
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Local
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
            - "52023:22"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

我也按照here的说明进行操作,直到可以将docker exec放入我的容器中,并手动键入上述python代码段并获得有效的连接。另外,我已经阅读了气流文档here,该文档涵盖SSH连接操作符,但是这些操作符仅支持bash命令,因此我需要运行python函数。我真的很困惑,为什么在exec-进入系统时python代码会起作用,但是当我通过气流DAG运行它时却不能。目前,我无法手动放置所有连接,因为一旦部署此系统,连接数量将超过100。任何帮助将不胜感激。如果需要更多深度,请让我知道。

python docker docker-compose airflow ssh-tunnel
1个回答
0
投票

打开隧道并尝试在单独的任务中连接数据库时,我遇到了同样的问题,但是通过在同一任务中同时执行这两个任务来使它正常工作(在任务运行之间,Airflow不会保持状态):

def select_from_tunnel_db():
    # Open SSH tunnel
    ssh_hook = SSHHook(ssh_conn_id='bastion-ssh-conn', keepalive_interval=60)
    tunnel = ssh_hook.get_tunnel(5432, remote_host='<db_host>', local_port=5432)
    tunnel.start()

    # Connect to DB and run query
    pg_hook = PostgresHook(
        postgres_conn_id='remote-db-conn',  # NOTE: host='localhost'
        schema='db_schema'
    )
    pg_cursor = pg_hook.get_conn().cursor()
    pg_cursor.execute('SELECT * FROM table;')
    select_val = pg_cursor.fetchall()

    return select_val


python_operator = PythonOperator(
    task_id='test_tunnel_conn',
    python_callable=select_from_tunnel_db,
    dag=dag
)

这会将端口5432上的流量从本地计算机转发到远程数据库主机上的同一端口。 SSHHook需要与要通过其进行隧道传输的端点建立有效的ssh连接,而PostgresHook需要在端口5432上与“本地主机”建立postgres连接。

© www.soinside.com 2019 - 2024. All rights reserved.