Reputation: 785
I have two PythonOperator in Airflow. I need to use the output from one operator to another. How can I do that ?
def get_data(**kwargs):
'''SQL here converted to df
'''
return df
run_this = PythonOperator(
task_id='data',
provide_context=True,
python_callable=get_data,
dag=dag,
)
How can I use the output from run_this
to another python operator ?
Upvotes: 0
Views: 2522
Reputation: 7815
One way to do it is to use XCOM:
import airflow.utils.dates as dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import logging
default_args = {'owner': 'Airflow', 'start_date': dates.days_ago(1)}
def create_df():
return pd.DataFrame({'a': [1, 2, 3]})
def read_df(task_instance, **kwargs):
df = task_instance.xcom_pull(task_ids='get_data')
logging.info(df) # Print the df to the log of the `use_data` task
with DAG(
'my_dag',
schedule_interval='@hourly',
default_args=default_args,
catchup=False,
) as dag:
get_data = PythonOperator(task_id='get_data', python_callable=create_df)
use_data = PythonOperator(
task_id='use_data', provide_context=True, python_callable=read_df,
)
get_data >> use_data
The other way is to store the data (e.g. to a file on a local disk) in create_df()
yourself and read it in read_df()
. The path to the file can be stored in a global variable. This way can be used if the dataframe cannot be pickled or more suitable serialization is needed (e.g. the dataframe is too big).
Upvotes: 1
Reputation: 2352
XCom is intended for sharing little pieces of information, like the len of the sql table, any specific values or things like that. It is not made for sharing dataframes (which can be huge) because the shared information is written in the metadata database.
So either you export somehow your dataframe (uploading to S3 or other cloud storage, saving as csv in your computer...), for reading it in the next Operator, or you combine the two operators in one.
Upvotes: 2