Reputation: 932
I am using Airflow's python operator to call a python function. The ERROR occurs in the try/except block.
def python_callable_new():
print("Inside python callable ...")
import psycopg2
try:
print("attempting database connection from python method.. ")
conn = psycopg2.connect('postgres_defined_connection')
print("success. ")
except Exception as error:
print("failed: ")
print (error)
return 'End of callable. '
with dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
do_python_task = PythonOperator(
task_id = 'do-py-operation',
python_callable= python_callable_new,
)
extract_source_data = PostgresOperator(
task_id='extract-cb-source-data',
postgres_conn_id='postgres_defined_connection',
sql='./sql_scripts/extract_csv_data.sql'
)
# csv_to_postgres
start_task >> do_python_task >> extract_source_data >> stop_task
Basically, my question is
(FYI - I store the postgres_defined_connection in a separate connections.py that uses sqlalchemy engine and PostgresHook)
Upvotes: 2
Views: 6643
Reputation: 591
psycopg2.connect
expects connection parameters. You can pass them a single string if you format your connection parameters as key/value pairs separated by space. That is why it is giving you the error message missing "=".
Please refer to the psycopg documentation for more information.
To connect to a Postgres database in Airflow, you can leverage the PostgresHook provided you have a connection created.
from airflow.hooks.postgres_hook import PostgresHook
def execute_query_with_conn_obj(query):
hook = PostgresHook(postgres_conn_id='my_connection')
conn = hook.get_conn()
cur = conn.cursor()
cur.execute(query)
def execute_query_with_hook(query):
hook = PostgresHook(postgres_conn_id='my_connection')
hook.run(sql=query)
You can also do it with pure Python code.
def execute_query_with_psycopg(query):
conn_args = dict(
host='myhost',
user='admin',
password='password',
dbname='my_schema',
port=5432)
conn = psycopg2.connect(**conn_args)
cur = conn.cursor()
cur.execute(query)
def execute_query_with_psycopg_string(query):
conn = psycopg2.connect("dbname=test user=postgres password=secret")
cur = conn.cursor()
cur.execute(query)
Upvotes: 6