adan11
adan11

Reputation: 787

Pass arguments from BranchPythonOperator to PythonOperator

I am still fairly new to Airflow, and trying to figure out the logistics of passing arguments between tasks/dags. My question is - is it possible to pass arguments from a BranchPythonOperator task, into the task_id's that it calls.

ie:

@task
def task_a():
    ***print(a)***
    return {}

def get_task_run(**kwargs):
    a = 'Pass-Argument'
    return 'task_a'

tasks = BranchPythonOperator(
        task_id='get_task_run',
        python_callable=get_task_run,
    )

In the code above for example, is it possible to somehow get the variable 'a' inside the 'task_a' that is called from the BranchPythonOperator?

Upvotes: 2

Views: 2596

Answers (1)

NicoE
NicoE

Reputation: 4853

One way of doing this could be by doing an xcom_push from withing the get_task_run function and then pulling it from task_a using get_current_context.

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context, BranchPythonOperator

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1),
     catchup=False, tags=['example'])
def decorated_dag():

    @task
    def task_a():
        context = get_current_context()
        var_from_branch_task = context['ti'].xcom_pull(
            task_ids='branch_task', key='a')
        print(f"Result: {var_from_branch_task}")

    @task
    def task_b():
        print('task_b')

    def _get_task_run(ti):
        if 'something':
            ti.xcom_push(key='a', value='var_pushed_from_branch task')
            return 'task_a'
        else:
            return 'task_b'

    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=_get_task_run,
    )
    task_a_exec = task_a()
    task_b_exec = task_b()
    branch_task >> [task_a_exec, task_b_exec]

example_decorated_dag = decorated_dag()

Keep in mind that that BranchPythonOperator should return a single task_id or a list of task_ids to follow. Thats why you just can't return a dict or list or tuple to use it as XcomArg with the other decorated tasks. Let me know if that worked for you!

Upvotes: 2

Related Questions