Reputation: 45
I have a postgres connection configured in airflow. Does anyone know how to get the schema, port, host, etc. of this connection so that I don't pass these fixed values in my code?
This code below is what I've been trying but with no sucess cause I'dont know how to pass the parameters. Does anyone knows how to get this parameters using psycopg2 ?
import psycopg2
from sqlalchemy import create_engine
conn_string = 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format( <How to get host, port, dbname, login and password from airflow here> )
engine = create_engine(conn_string)
return engine
connection = psycopg2.connect(
login=engine.user,
password=engine.password,
dbname=engine.dbname,
host=engine.host,
port=engine.port
)
task_load = PythonOperator(
task_id="task_id",
python_callable=get_conn,
op_kwargs={
'postgres_conn': get_vars['postgres_conn']
},
provide_context=True,
)
Upvotes: 1
Views: 3187
Reputation: 16109
You can use Airflow macros as explained in this answer but it's not really needed for your issue.
Your issue is to get psycopg2.connect()
object for that you can use PostgresHook
. You mentioned you already have PostgresSQL connection defined in Airflow so all you left to do is:
from airflow.providers.postgres.hooks.postgres import PostgresHook
def work_with_postgress():
hook = PostgresHook(postgres_conn_id="postgres_conn_id")
conn = hook.get_conn() # this returns psycopg2.connect() object
# You can also just run sql directly with the hook
hook.run(sql="UPDATE my_table SET my_col = 'value'")
df = hook.get_pandas_df("SELECT * FROM my_table") # return dataframe object
If for some reason you can't work with the hook with psycopg2
directly rather than with PostgresHook
then you have to use macros and place the code inside python callable (I don't recommend doing that!):
def work_with_postgress(**kwargs):
import psycopg2
from sqlalchemy import create_engine
conn_string = 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'.format(...)
engine = create_engine(conn_string)
connection = psycopg2.connect(
login=engine.user,
password=engine.password,
dbname=engine.dbname,
host=engine.host,
port=engine.port
)
task_load = PythonOperator(
task_id="task_id",
python_callable=work_with_postgress,
op_kwargs={
# You can pass here the macros and they will be available in the callable
},
)
Upvotes: 1