当我的Docker容器内部运行apache airflow时,我的程序无法创建SSH隧道。仅在本地计算机上运行该功能可以正常工作。我有一个服务器列表,可用于创建隧道,查询数据库并关闭连接。通常,我会通过以下方式进行操作:
for server in servers:
server_conn = sshtunnel.SSHTunnelForwarder(
server,
ssh_username=ssh_user,
ssh_password=ssh_password,
remote_bind_address=(localhost, db_port),
local_bind_address=(localhost, localport)
)
这可以按预期工作,我可以在那里做我需要做的任何事情。但是,在Docker中,它不起作用。我意识到docker运行并绑定到端口,并且实际上不在主机系统之外,因此我使用network_mode="host"
来缓解此问题。但是,这不起作用,因为我的容器失去了相互通信的能力。这是我的docker-compose文件
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
- PGDATA=/var/lib/postgresql/data/pgdata
volumes:
- ~/.whale/pgdata:/var/lib/postgresql/data/pgdata
- ./dags/docker/sql/create.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
webserver:
image: hawk
build:
context: .
dockerfile: ./dags/docker/Dockerfile-airflow
restart: always
depends_on:
- postgres
# - redis
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Local
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
- "52023:22"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
我也按照here的说明进行操作,直到可以将docker exec
放入我的容器中,并手动键入上述python代码段并获得有效的连接。另外,我已经阅读了气流文档here,该文档涵盖SSH连接操作符,但是这些操作符仅支持bash命令,因此我需要运行python函数。我真的很困惑,为什么在exec
-进入系统时python代码会起作用,但是当我通过气流DAG运行它时却不能。目前,我无法手动放置所有连接,因为一旦部署此系统,连接数量将超过100。任何帮助将不胜感激。如果需要更多深度,请让我知道。
打开隧道并尝试在单独的任务中连接数据库时,我遇到了同样的问题,但是通过在同一任务中同时执行这两个任务来使它正常工作(在任务运行之间,Airflow不会保持状态):
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='bastion-ssh-conn', keepalive_interval=60)
tunnel = ssh_hook.get_tunnel(5432, remote_host='<db_host>', local_port=5432)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='remote-db-conn', # NOTE: host='localhost'
schema='db_schema'
)
pg_cursor = pg_hook.get_conn().cursor()
pg_cursor.execute('SELECT * FROM table;')
select_val = pg_cursor.fetchall()
return select_val
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
dag=dag
)
这会将端口5432上的流量从本地计算机转发到远程数据库主机上的同一端口。 SSHHook需要与要通过其进行隧道传输的端点建立有效的ssh连接,而PostgresHook需要在端口5432上与“本地主机”建立postgres连接。