Reputation: 1561
I have the below PostgresOperator used to run a SQL query. I want to see if I can save the output of this airflow DAG as a Pandas Dataframe.
section_1 = PostgresOperator(
task_id='task_id',
default_args=args,
postgres_conn_id="db_conn_id",
sql="fetching_data.sql",
dag=dag)
SQL Query:
select cast_id, prod_id, name from sales;
I would like the output to be saved a Pandas Dataframe.
Upvotes: 1
Views: 4251
Reputation: 16109
You can not pass dataframe between operators. Airflow does offer the ability to pass metadata between tasks (operators) in the form of Xcoms but you shouldn't use it to pass large sets of data.
To interact with dataframe you'll need to work with PostgresHook:
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
def my_task():
hook = PostgresHook(postgres_conn_id="db_conn_id")
df = hook.get_pandas_df(sql="select cast_id, prod_id, name from sales;")
# do what you need with the df....
run_this = PythonOperator(
task_id='postgres_task',
python_callable=my_task,
)
If you want to read the query from file you can also do:
class SQLPythonOperator(PythonOperator):
template_ext = PythonOperator.template_ext + ('.sql',)
def my_task(**context):
query = context['templates_dict']['query']
hook = PostgresHook(postgres_conn_id="db_conn_id")
df = hook.get_pandas_df(sql=query)
# do what you need with the df....
run_this = SQLPythonOperator(
task_id='postgres_task',
python_callable=my_task,
templates_dict={'query': 'fetching_data.sql'},
)
Alternatively you can write your own custom operator that implements the logic you wish.
Upvotes: 4