Reputation: 155
How can I get xcom from an airflow task and create other tasks using theses values. Per exemple:
def func_test():
return ['task_2', 'task_3']
with DAG(
'dag_name',
schedule_interval="@once",
start_date=datetime(2022, 4, 19),
catchup=False,
default_args= {
'depends_on_past': False,
'retries': 0
}
) as dag:
task_1 = PythonOperator(
task_id='func_test',
python_callable=func_test,
provide_context=True
)
for task in task_1.output:
new_tasks = PythonOperator(
task_id=task,
python_callable=another_function,
provide_context=True
)
The expected result that I'm trying to achieve is two new tasks based on the return value of task 1.
Is it possible to create new tasks based on a return of a function?
Upvotes: 0
Views: 1564
Reputation: 3094
Dynamic task mapping was introduced in Airflow 2.3 to support this use case. While you can use "classic" Airflow operators, I suggest using dynamic task mapping in combination with the TaskFlow API, which makes it a lot easier:
import datetime
from airflow.decorators import task
from airflow.models import DAG
with DAG(
"dag_name",
schedule_interval="@once",
start_date=datetime.datetime(2022, 4, 19),
catchup=False,
default_args={"depends_on_past": False, "retries": 0},
) as dag:
@task
def func_test():
return ["task_2", "task_3"]
@task
def another_function(input_value):
print(input_value)
another_function.expand(input_value=func_test())
Your graph view will look as follows:
another_function
shows [2]
, indicating two tasks were generated/run.
When using the TaskFlow API, the output from func_test
is automatically stored as an XCom and used as input for the another_function
task. expand()
tells Airflow to generate a task for each element in the given collection, which in this case is the output of func_test
.
Upvotes: 2