Programatically create SSH tunnel inside of dockerized apache airflow python operator Programatically create SSH tunnel inside of dockerized apache airflow python operator docker docker

Programatically create SSH tunnel inside of dockerized apache airflow python operator


I was having this same issue when opening the tunnel and trying to connect to the database in separate tasks, but got it working by doing both in the same task (Airflow doesn't persist state between task runs):

def select_from_tunnel_db():    # Open SSH tunnel    ssh_hook = SSHHook(ssh_conn_id='bastion-ssh-conn', keepalive_interval=60)    tunnel = ssh_hook.get_tunnel(5432, remote_host='<db_host>', local_port=5432)    tunnel.start()    # Connect to DB and run query    pg_hook = PostgresHook(        postgres_conn_id='remote-db-conn',  # NOTE: host='localhost'        schema='db_schema'    )    pg_cursor = pg_hook.get_conn().cursor()    pg_cursor.execute('SELECT * FROM table;')    select_val = pg_cursor.fetchall()    return select_valpython_operator = PythonOperator(    task_id='test_tunnel_conn',    python_callable=select_from_tunnel_db,    dag=dag)

This forwards traffic on port 5432 from the local machine to the same port on the remote db host. The SSHHook requires a working ssh connection to the endpoint you will be tunneling through and PostgresHook requires a postgres connection to 'localhost' on port 5432.