cluis92
cluis92

Reputation: 932

How to connect to postgres using a postgres connection id inside a python callable

I am using Airflow's python operator to call a python function. The ERROR occurs in the try/except block.

def python_callable_new():
    print("Inside python callable ...")

    import psycopg2

    try:
        print("attempting database connection from python method.. ")
        conn = psycopg2.connect('postgres_defined_connection')
        print("success. ")
    except Exception as error:
        print("failed: ")
        print (error)
    return 'End of callable. '

    

with dag:
    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    
    do_python_task = PythonOperator(
        task_id = 'do-py-operation',
        python_callable= python_callable_new,
    )

    extract_source_data = PostgresOperator(
        task_id='extract-cb-source-data',
        postgres_conn_id='postgres_defined_connection',
        sql='./sql_scripts/extract_csv_data.sql'
    )

    # csv_to_postgres

start_task >> do_python_task >> extract_source_data >> stop_task

Basically, my question is

(FYI - I store the postgres_defined_connection in a separate connections.py that uses sqlalchemy engine and PostgresHook)

Upvotes: 2

Views: 6643

Answers (1)

Alan Ma
Alan Ma

Reputation: 591

psycopg2.connect expects connection parameters. You can pass them a single string if you format your connection parameters as key/value pairs separated by space. That is why it is giving you the error message missing "=".

Please refer to the psycopg documentation for more information.


To connect to a Postgres database in Airflow, you can leverage the PostgresHook provided you have a connection created.

from airflow.hooks.postgres_hook import PostgresHook
 
def execute_query_with_conn_obj(query):
    hook = PostgresHook(postgres_conn_id='my_connection')
    conn = hook.get_conn()
    cur = conn.cursor()
    cur.execute(query)

def execute_query_with_hook(query):
    hook = PostgresHook(postgres_conn_id='my_connection')
    hook.run(sql=query)

You can also do it with pure Python code.

def execute_query_with_psycopg(query):
    conn_args = dict(
        host='myhost',
        user='admin',
        password='password',
        dbname='my_schema',
        port=5432)
    conn = psycopg2.connect(**conn_args)
    cur = conn.cursor()
    cur.execute(query)

def execute_query_with_psycopg_string(query):
    conn = psycopg2.connect("dbname=test user=postgres password=secret")
    cur = conn.cursor()
    cur.execute(query)

Upvotes: 6

Related Questions