tristobal
tristobal

Reputation: 506

Passing a variable from a DAG to an external function

I have the following two files. One with the DAG and two tasks (DummyOperator and TaskGroup).

# example_dag.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from includes.taskgroup import build_taskgroup



default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id="modularized_dag",
    schedule_interval="@once",
    start_date=days_ago(1),
    default_args=default_args,
) as dag:
    first = DummyOperator(task_id="first_task", dag=dag)
    second = build_taskgroup(dag, "lorem ipsum dolor sit amet")
    first >> second

The second file is a method that creates and returns the TaskGroup called and in the first file.

# includes/taskgroup.py
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup


def print_variable(templates_dict: str)
    logging.info(f'input_text: {templates_dict}')


def build_taskgroup(dag: DAG, templates_dict: str) -> TaskGroup:

    with TaskGroup(group_id="xyzzy_taskgroup") as task_group:

        second_task = DummyOperator(task_id="second_task", task_group=task_group, dag=dag)

        third_task = PythonOperator(
            task_id="third_task",
            task_group=task_group,
            python_callable=print_variable,
            op_kwargs={'templates_dict': templates_dict},
            dag=dag,
        )

        second_task >> third_task

    return task_group

My problem is the following: In the first file I pass a variable (input_text) to the method that creates the TaskGroup and this in turn passes input_text to a PythonOperator that simply prints it. I don't know why the variable is not passed from the DAG to the method. When I print it I have:

input_text: None

Am I forgetting something basic about the DAG lifecycle? Is there another way to pass a variable to the method that creates the TaskGroup?

Thanks in advance.


UPDATE

When I tried to write a snippet of code to replicate my problem (the base problem was private code, work) I change the name of one variable and that rename was exactly the origin of my problem, and that's why the snippet I put here worked for L. D. Nicolas May.

That was the mess:

        third_task = PythonOperator(
            #...
            op_kwargs={'templates_dict': templates_dict},
        )

It seems that I cannot use a key name in op_kwargs as templates_dict maybe because that is a PythonOperator parameter.

Sorry the mess.

Upvotes: 0

Views: 2083

Answers (2)

tristobal
tristobal

Reputation: 506

Thinking twice about Jorge's answer, as a workaround, I had to create an initial operator that store the variable. So I pass from...

enter image description here

To: enter image description here

And the code was:

# example_dag.py
def store_variable(ti):
    ti.xcom_push(key="input_text_id", value="lorem ipsum dolor sit amet")


with DAG(...) as dag:
    zero = PythonOperator(task_id="store_variable", python_callable=store_variable, dag=dag)
    first = ...
    second = build_taskgroup(dag)
    zero >> first >> second

And in the second file:

# includes/taskgroup.py
def print_variable(ti):
    input_text = ti.xcom_pull(key='input_text_id')
    logging.info(f'input_text: {input_text}')


def build_taskgroup(dag: DAG) -> TaskGroup:
    with TaskGroup(group_id="taskgroup") as task_group:
        # ...
        third_task = PythonOperator(
            task_id="third_task",
            task_group=task_group,
            python_callable=print_variable,
            dag=dag,
        )

    # ...

This is a working workaround. I still don't know the base of my problem (passing variables to a function inside of a DAG).

Upvotes: 1

Jorge Martinez
Jorge Martinez

Reputation: 1

Maybe you need to use xcom values to pass the variables between DAG's.

Try this in your print function:

def print_variable(input_text: str, **kwargs)
    logging.info(f'input_text: {input_text}')
    ti = kwargs['ti']
    xcom_value = ti.xcom_pull(task_ids='third_task')

Where xcom_value will be your "input_text" variable. The "xcom_pull" is a feature that Composer/Airflow have to pull the variables from other DAG's or functions. Also, you need to provide context to the third_task DAG:

third_task = PythonOperator(
            task_id="third_task",
            task_group=task_group,
            python_callable=print_variable,
            op_kwargs={'input_text': input_text},
            provide_context=True,
            dag=dag,
        )

With this, you will able to pass the "input_text" variable to the print_variable function.

Upvotes: 0

Related Questions