Wong Chloe
Wong Chloe

Reputation: 45

How to get Airflow connection parameters using psycopg2

I have a postgres connection configured in airflow. Does anyone know how to get the schema, port, host, etc. of this connection so that I don't pass these fixed values in my code?

This code below is what I've been trying but with no sucess cause I'dont know how to pass the parameters. Does anyone knows how to get this parameters using psycopg2 ?

import psycopg2
from sqlalchemy import create_engine

conn_string = 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format( <How to get host, port, dbname, login and password from airflow here> )

engine = create_engine(conn_string)

return engine

connection = psycopg2.connect(
    login=engine.user,
    password=engine.password,
    dbname=engine.dbname,
    host=engine.host,
    port=engine.port
)

    task_load = PythonOperator(
        task_id="task_id",
        python_callable=get_conn,
        op_kwargs={
            'postgres_conn': get_vars['postgres_conn']
        },
        provide_context=True,
    )

Upvotes: 1

Views: 3187

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16109

You can use Airflow macros as explained in this answer but it's not really needed for your issue.

Your issue is to get psycopg2.connect() object for that you can use PostgresHook. You mentioned you already have PostgresSQL connection defined in Airflow so all you left to do is:

from airflow.providers.postgres.hooks.postgres import PostgresHook
def work_with_postgress():
    hook = PostgresHook(postgres_conn_id="postgres_conn_id")
    conn = hook.get_conn()  # this returns psycopg2.connect() object

    # You can also just run sql directly with the hook
    hook.run(sql="UPDATE my_table SET my_col = 'value'")
    df = hook.get_pandas_df("SELECT * FROM my_table")  # return dataframe object

If for some reason you can't work with the hook with psycopg2 directly rather than with PostgresHook then you have to use macros and place the code inside python callable (I don't recommend doing that!):

def work_with_postgress(**kwargs):
    import psycopg2
    from sqlalchemy import create_engine

    conn_string = 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(...)
    engine = create_engine(conn_string)

    connection = psycopg2.connect(
        login=engine.user,
        password=engine.password,
        dbname=engine.dbname,
        host=engine.host,
        port=engine.port
    )

task_load = PythonOperator(
    task_id="task_id",
    python_callable=work_with_postgress,
    op_kwargs={
        # You can pass here the macros and they will be available in the callable

    },
)

Upvotes: 1

Related Questions