Reputation: 83
I would like to calculate dates before I created next task, Ideally one task per date.
I tried TaskInstance.xcom_pull()
to access to returned value of first task.
But when I tried to used that in a for loop, it will fail due to NoneType
, which makes sense since it hasn't be generated yet.
I can use partial()
and expand()
to create tasks as well as here. But I think the real issue is how can I get returned value from previous task before the following tasks are created.
Also is it possible to create subtasks inside the python_callable
?
def date_calculator(**kwargs):
date_list = [date(2023, 11, 1).strftime("%Y_%m_%d"), date(2023, 11, 2).strftime("%Y_%m_%d")]
return date_list
def test_value(**kwargs):
...
with dag:
date_calculation_task = PythonOperator(
task_id="date_calculation",
provide_context=True,
python_callable=date_calculator,
op_kwargs={
"dag": dag,
"start_date_str": "{{ dag_run.conf.get('start_date', 'a') }}",
"end_date_str": "{{ dag_run.conf.get('end_date', 'b') }}",
},
dag=dag,
)
task_instance = TaskInstance(date_calculation_task, datetime.now())
task_xcom_values = task_instance.xcom_pull(task_ids=date_calculation_task.task_id, include_prior_dates=True)
for date in task_xcom_values:
get_returned_value = PythonOperator(
task_id=f"get_returned_value_{date}",
provide_context=True,
python_callable=test_value,
op_kwargs={
"dag": dag,
"task_xcom_values": task_xcom_values,// this works if not created dynamically
},
dag=dag,
)
date_calculation_task >> get_returned_value
Thanks.
Upvotes: 0
Views: 1641
Reputation: 11
I do not know the exact context and need of this approach, However will try to suggest the following:
Suggestion: Rather than running the entire operator in a 'for loop', Better to run the required loop inside a operator/function basis the input.
Example: from airflow.decorators import dag, task from datetime import datetime, timedelta from airflow.operators.python_operator import PythonOperator from airflow.hooks.base_hook import BaseHook
with DAG(dag_id=dag_id, start_date=datetime(2023, 11, 20),catchup=False) as dag:
def python_params(ti):
date_list_xcoms = ti.xcom_pull(task_ids="get_date_list")
print(f"date entered is {date_list_xcoms} - params passed")
for date_lists in date_list_xcoms:
#Write the required logic here
def get_date_list():
#you can write the date calculator logic here and create a LIST out of it
date_list = [date(2023, 11, 1).strftime("%Y_%m_%d"), date(2023, 11, 2).strftime("%Y_%m_%d")]
return date_list
task1 = PythonOperator(task_id="xcom_params1", python_callable=python_params)
task2 = PythonOperator(task_id="get_date_list", python_callable=get_date_list)
task2 >> task1
Note: As noted by you, using dynamic tasks can also solve this case where-in different tasks can be spewed, basis different dates (or inputs).
Upvotes: 1