Bhupendra
Bhupendra

Reputation: 1743

How do I pass data from one operator to another

I made a custom airflow operator, this operator takes an input and the output of this operator is on XCOM.

What I want to achieve is to call the operator with some defined input, parse the output as Python callable inside the Branch Operator and then pass the parsed output to another task that calls the same operator tree:

CustomOperator_Task1 = CustomOperator(
    data={
        'type': 'custom',
        'date': '2017-11-12'
    },
    task_id='CustomOperator_Task1',
    dag=dag)

data = {}
def checkOutput(**kwargs):
    result = kwargs['ti'].xcom_pull(task_ids='CustomOperator_Task1')

    if result.success = True:
        data = result.data
        return "CustomOperator_Task2"
    return "Failure"

BranchOperator_Task = BranchPythonOperator(
    task_id='BranchOperator_Task ',
    dag=dag,
    python_callable=checkOutput,
    provide_context=True,
    trigger_rule="all_done")

CustomOperator_Task2 = CustomOperator(
    data= data,
    task_id='CustomOperator_Task2',
    dag=dag)

CustomOperator_Task1 >> BranchOperator_Task >> CustomOperator_Task2

In task CustomOperator_Task2 I would want to pass the parsed data from BranchOperator_Task. Right now it is always empty {}

What is the best way to do that?

Upvotes: 4

Views: 4081

Answers (2)

Daniel Huang
Daniel Huang

Reputation: 6548

I see your issue now. Setting the data variable like you are won't work because of how Airflow works. An entirely different process will be running the next task, so it won't have the context of what data was set to.

Instead, BranchOperator_Task has to push the parsed output into another XCom so CustomOperator_Task2 can explicitly fetch it.

def checkOutput(**kwargs):
    ti = kwargs['ti']
    result = ti.xcom_pull(task_ids='CustomOperator_Task1')

    if result.success:
        ti.xcom_push(key='data', value=data)
        return "CustomOperator_Task2"
    return "Failure"

BranchOperator_Task = BranchPythonOperator(
    ...)

CustomOperator_Task2 = CustomOperator(
    data_xcom_task_id=BranchOperator_Task.task_id,
    data_xcom_key='data',
    task_id='CustomOperator_Task2',
    dag=dag)

Then your operator might look something like this.

class CustomOperator(BaseOperator):

    @apply_defaults 
    def __init__(self, data_xcom_task_id, data_xcom_key, *args, **kwargs):
        self.data_xcom_task_id = data_xcom_task_id
        self.data_xcom_key = data_xcom_key
    def execute(self, context):
        data = context['ti'].xcom_pull(task_ids=self.data_xcom_task_id, key=self.data_xcom_key)
        ...

Parameters may not be required if you just want to hardcode them. It depends on your use case.

Upvotes: 2

Nick
Nick

Reputation: 2903

As your comment suggests, the return value from your custom operator is None, therefore your xcom_pull should expect to be empty. Please use xcom_push explicitly, as the default behavior of airflow could change over time.

Upvotes: 0

Related Questions