Reputation: 506
I have the following two files. One with the DAG and two tasks (DummyOperator and TaskGroup).
# example_dag.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from includes.taskgroup import build_taskgroup
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id="modularized_dag",
schedule_interval="@once",
start_date=days_ago(1),
default_args=default_args,
) as dag:
first = DummyOperator(task_id="first_task", dag=dag)
second = build_taskgroup(dag, "lorem ipsum dolor sit amet")
first >> second
The second file is a method that creates and returns the TaskGroup called and in the first file.
# includes/taskgroup.py
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
def print_variable(templates_dict: str)
logging.info(f'input_text: {templates_dict}')
def build_taskgroup(dag: DAG, templates_dict: str) -> TaskGroup:
with TaskGroup(group_id="xyzzy_taskgroup") as task_group:
second_task = DummyOperator(task_id="second_task", task_group=task_group, dag=dag)
third_task = PythonOperator(
task_id="third_task",
task_group=task_group,
python_callable=print_variable,
op_kwargs={'templates_dict': templates_dict},
dag=dag,
)
second_task >> third_task
return task_group
My problem is the following: In the first file I pass a variable (input_text) to the method that creates the TaskGroup and this in turn passes input_text to a PythonOperator that simply prints it. I don't know why the variable is not passed from the DAG to the method. When I print it I have:
input_text: None
Am I forgetting something basic about the DAG lifecycle? Is there another way to pass a variable to the method that creates the TaskGroup?
Thanks in advance.
UPDATE
When I tried to write a snippet of code to replicate my problem (the base problem was private code, work) I change the name of one variable and that rename was exactly the origin of my problem, and that's why the snippet I put here worked for L. D. Nicolas May.
That was the mess:
third_task = PythonOperator(
#...
op_kwargs={'templates_dict': templates_dict},
)
It seems that I cannot use a key name in op_kwargs as templates_dict
maybe because that is a PythonOperator parameter.
Sorry the mess.
Upvotes: 0
Views: 2083
Reputation: 506
Thinking twice about Jorge's answer, as a workaround, I had to create an initial operator that store the variable. So I pass from...
And the code was:
# example_dag.py
def store_variable(ti):
ti.xcom_push(key="input_text_id", value="lorem ipsum dolor sit amet")
with DAG(...) as dag:
zero = PythonOperator(task_id="store_variable", python_callable=store_variable, dag=dag)
first = ...
second = build_taskgroup(dag)
zero >> first >> second
And in the second file:
# includes/taskgroup.py
def print_variable(ti):
input_text = ti.xcom_pull(key='input_text_id')
logging.info(f'input_text: {input_text}')
def build_taskgroup(dag: DAG) -> TaskGroup:
with TaskGroup(group_id="taskgroup") as task_group:
# ...
third_task = PythonOperator(
task_id="third_task",
task_group=task_group,
python_callable=print_variable,
dag=dag,
)
# ...
This is a working workaround. I still don't know the base of my problem (passing variables to a function inside of a DAG).
Upvotes: 1
Reputation: 1
Maybe you need to use xcom values to pass the variables between DAG's.
Try this in your print function:
def print_variable(input_text: str, **kwargs)
logging.info(f'input_text: {input_text}')
ti = kwargs['ti']
xcom_value = ti.xcom_pull(task_ids='third_task')
Where xcom_value will be your "input_text" variable. The "xcom_pull" is a feature that Composer/Airflow have to pull the variables from other DAG's or functions. Also, you need to provide context to the third_task DAG:
third_task = PythonOperator(
task_id="third_task",
task_group=task_group,
python_callable=print_variable,
op_kwargs={'input_text': input_text},
provide_context=True,
dag=dag,
)
With this, you will able to pass the "input_text" variable to the print_variable function.
Upvotes: 0