Reputation: 155
So I'm creating a data flow with Apache Airflow for grabbing some data that's stored in a Pandas Dataframe and then storing it into MongoDB. So I have two python methods, one for fetching the data and returning the dataframe and the other for storing it into the relevant database. How do I take the output of one task and feed it as the input to another task? This is what I have so far (summarized and condensed version)
I looked into the concept of xcom pull and push and that's what I implemented below , I also saw that there's a MongoHook for Airflow but wasn't quite sure on how to use it.
import pandas as pd
import pymongo
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
def get_data(name, **context):
data = pd.read_csv('dataset.csv')
df = data.loc[data.name == name]
context['ti'].xcom_push(task_ids=['get-data'], value=data)
def push_to_db(df, dbname, collection):
client = pymongo.MongoClient(-insert creds here-)
db = client[dbname][collection]
data = df.to_dict(orient='records')
db.insert_many(data)
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='simple_xcom',
default_args=args,
start_date=datetime(2019, 09, 02),
schedule_interval="@daily",
retries=2
)
task1 = PythonOperator(task_id='get-data', params=['name': 'John'],
python_callable=get_data,
provide_context=True, dag=dag)
task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'),
'dbname': 'person', 'table': 'salary'),
python_callable=push_to_db, provide_context=True, dag=dag)
task1 >> task2
Everytime I try to run it, it displays that context does not exist. So maybe I'm doing some wrong in terms of feeding the output of one task as the input to another?
Upvotes: 0
Views: 2615
Reputation: 821
As the answer above, a custom XCom backend could resolve the problem.
We have recently implemented a custom XCom backend for airflow, backed by vineyard, to support such kind of cases.
The vineyard XCom backend enables zero-copy data sharing between tasks in a DAG, and support python values like numpy.ndarray
, pandas.DataFrame
, and data in tensorflow/mxnet/pytorch.
The provider is opensource there: https://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow
With the Vineyard XCom backend, users could have dag that produces and consumes pandas.DataFrame
directly, without any "to_csv" + "from_csv" hacks,
import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
@task()
def extract():
order_data_dict = pd.DataFrame({
'a': np.random.rand(100000),
'b': np.random.rand(100000),
})
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
return {"total_order_value": order_data_dict["a"].sum()}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_etl_pandas_dag = taskflow_etl_pandas()
Hope that helps in your cases.
Upvotes: 0
Reputation: 188
Have a look at the example xcom DAG.
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py
Upvotes: 1