Sam
Sam

Reputation: 155

Using the output of one Python task and using as the input to another Python Task on Airflow

So I'm creating a data flow with Apache Airflow for grabbing some data that's stored in a Pandas Dataframe and then storing it into MongoDB. So I have two python methods, one for fetching the data and returning the dataframe and the other for storing it into the relevant database. How do I take the output of one task and feed it as the input to another task? This is what I have so far (summarized and condensed version)

I looked into the concept of xcom pull and push and that's what I implemented below , I also saw that there's a MongoHook for Airflow but wasn't quite sure on how to use it.

import pandas as pd
import pymongo
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator


def get_data(name, **context):
    data = pd.read_csv('dataset.csv')
    df = data.loc[data.name == name]
    context['ti'].xcom_push(task_ids=['get-data'], value=data)

def push_to_db(df, dbname, collection):
    client = pymongo.MongoClient(-insert creds here-)
    db = client[dbname][collection]
    data = df.to_dict(orient='records')
    db.insert_many(data)

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
  dag_id='simple_xcom',
  default_args=args,
  start_date=datetime(2019, 09, 02),
  schedule_interval="@daily",
  retries=2
)

task1 = PythonOperator(task_id='get-data', params=['name': 'John'], 
        python_callable=get_data, 
        provide_context=True, dag=dag)

task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'), 
    'dbname': 'person', 'table': 'salary'), 
    python_callable=push_to_db, provide_context=True, dag=dag) 

task1 >> task2 


Everytime I try to run it, it displays that context does not exist. So maybe I'm doing some wrong in terms of feeding the output of one task as the input to another?

Upvotes: 0

Views: 2615

Answers (2)

sighingnow
sighingnow

Reputation: 821

As the answer above, a custom XCom backend could resolve the problem.

We have recently implemented a custom XCom backend for airflow, backed by vineyard, to support such kind of cases.

The vineyard XCom backend enables zero-copy data sharing between tasks in a DAG, and support python values like numpy.ndarray, pandas.DataFrame, and data in tensorflow/mxnet/pytorch. The provider is opensource there: https://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow

With the Vineyard XCom backend, users could have dag that produces and consumes pandas.DataFrame directly, without any "to_csv" + "from_csv" hacks,

import numpy as np
import pandas as pd

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
    @task()
    def extract():
        order_data_dict = pd.DataFrame({
            'a': np.random.rand(100000),
            'b': np.random.rand(100000),
        })
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        return {"total_order_value": order_data_dict["a"].sum()}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])

taskflow_etl_pandas_dag = taskflow_etl_pandas()

Hope that helps in your cases.

Upvotes: 0

Sivasankar Boomarapu
Sivasankar Boomarapu

Reputation: 188

Have a look at the example xcom DAG.

https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py

Upvotes: 1

Related Questions