user3145047
user3145047

Reputation: 83

Airflow DAG: using returned values from previous task to create following tasks dynamically

I would like to calculate dates before I created next task, Ideally one task per date. I tried TaskInstance.xcom_pull() to access to returned value of first task.

But when I tried to used that in a for loop, it will fail due to NoneType, which makes sense since it hasn't be generated yet.

I can use partial() and expand() to create tasks as well as here. But I think the real issue is how can I get returned value from previous task before the following tasks are created.

Also is it possible to create subtasks inside the python_callable?

def date_calculator(**kwargs):
    date_list = [date(2023, 11, 1).strftime("%Y_%m_%d"), date(2023, 11, 2).strftime("%Y_%m_%d")]
    return date_list


def test_value(**kwargs):
    ...


with dag:
    date_calculation_task = PythonOperator(
        task_id="date_calculation",
        provide_context=True,
        python_callable=date_calculator,
        op_kwargs={
            "dag": dag,
            "start_date_str": "{{ dag_run.conf.get('start_date', 'a') }}",
            "end_date_str": "{{ dag_run.conf.get('end_date', 'b') }}",
        },
        dag=dag,
    )

    task_instance = TaskInstance(date_calculation_task, datetime.now())
    task_xcom_values = task_instance.xcom_pull(task_ids=date_calculation_task.task_id, include_prior_dates=True)

    for date in task_xcom_values:
        get_returned_value = PythonOperator(
            task_id=f"get_returned_value_{date}",
            provide_context=True,
            python_callable=test_value,
            op_kwargs={
                "dag": dag,
                "task_xcom_values": task_xcom_values,// this works if not created dynamically
            },
            dag=dag,
        )



date_calculation_task >> get_returned_value

Thanks.

Upvotes: 0

Views: 1641

Answers (1)

Surya A
Surya A

Reputation: 11

I do not know the exact context and need of this approach, However will try to suggest the following:

Suggestion: Rather than running the entire operator in a 'for loop', Better to run the required loop inside a operator/function basis the input.

Example: from airflow.decorators import dag, task from datetime import datetime, timedelta from airflow.operators.python_operator import PythonOperator from airflow.hooks.base_hook import BaseHook

with DAG(dag_id=dag_id, start_date=datetime(2023, 11, 20),catchup=False) as dag:

def python_params(ti):
    date_list_xcoms = ti.xcom_pull(task_ids="get_date_list")
    print(f"date entered is {date_list_xcoms} - params passed")

    for date_lists in date_list_xcoms:
    #Write the required logic here
    
def get_date_list():
    #you can write the date calculator logic here and create a LIST out of it
    date_list = [date(2023, 11, 1).strftime("%Y_%m_%d"), date(2023, 11, 2).strftime("%Y_%m_%d")]
    return date_list


task1 = PythonOperator(task_id="xcom_params1", python_callable=python_params)
task2 = PythonOperator(task_id="get_date_list", python_callable=get_date_list)

task2 >> task1

Note: As noted by you, using dynamic tasks can also solve this case where-in different tasks can be spewed, basis different dates (or inputs).

Upvotes: 1

Related Questions