Francisco
Francisco

Reputation: 1251

Create dynamic tasks depending on the result of an sql query in airflow

I am trying to create dynamic tasks with TaskGroup saving the result in a variable. The variable is modified every N minutes depending on a database query but when the variable is modified the second time the scheduler breaks down

Basically I need to create tasks based on the number of unique rows that is received in the query.

with TaskGroup(f"task") as task:

    data_variable = Variable.get("df")
    data = data_variable

    try :
        if data != False and data !='none':
            df = pd.read_json(data)

            for field_id in df.field.unique():
             

                task1 = PythonOperator(
                   
                )
                task2 = PythonOperator(
                   
                )

               
                task1 >> task2

    except:
        pass

Is there a way to do it with taskgroup for this?

Upvotes: 4

Views: 1851

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16079

For Airflow >=2.3.0:

Support for dynamic task creation has been added in AIP-42 Dynamic Task Mapping You can read about this in the docs. In simple words it added a map index option to tasks so a task can expand into different amount of indexes in every run.

For Airflow <2.3.0:

This is not supported.

While you can use Variable.get("df") at a top code you shouldn't do that. Variables / Connections / any other code that creates a query with any database should be done only inside operators scope or using Jinja templating. The reason for this is that Airflow parse the DAG file periodically (every 30 seconds if you didn't change default of min_file_process_interval ) thus having a code that interacts with the database every 30 seconds will cause heavy load on that database. For some of these cases there will be a warning in future airflow versions (see PR)

Airflow tasks should be as static as possible (or slowly changing).

Upvotes: 2

Related Questions