sotn
sotn

Reputation: 2111

Airflow tasks in a loop based on dag_run conf value

I am trying to create multiple airflow tasks based on dag_run conf input. The conf would have an array of values and the each value needs to spawn a task. The task in turn needs to pass the value to its callable func. Something like this:

 #create this task in a loop
 task = PythonOperator(task_id="fetch_data", python_callable=fetch_data(value from array), retries=10)

Conf would have a value like:

{"fruits":["apple","kiwi","orange"]}

I think this can be accessed with:

kwargs['dag_run'].conf('fruits')

How do I access this value outside an operator and then create operators in a loop?

Upvotes: 2

Views: 4005

Answers (3)

Fn Fn
Fn Fn

Reputation: 1

Had the same question. My workaround on that is using Airflow variables.

from airflow.models import Variable

foo_json = Variable.get("foo_baz", deserialize_json=True)

with DAG (...) as dag:
    for x in foo_json['task_list']:
        t1 = PythonOperator(
                 task_id=f'task_{x}', ...)

The content is a list which can be used to create dynamic amount of tasks with for loop inside a Dag. Every time I need different config for tasks I simply change it right before a Dagrun. It's not done with one command but with two to reach that functionality regardless on manual, api or CMD usage.

Upvotes: 0

Robert Wallace
Robert Wallace

Reputation: 26

I wish there was some sort of parallel for operator in airflow like kubeflow has. To solve this problem using airflow, I end up triggering another DAG/Dag Run using the TriggerDagRunOperator... it looks something like this.

from airflow.operators.dagrun_operator import TriggerDagRunOperator

def trigger_extract_dag(**kwargs):
    config = kwargs['dag_run'].conf

    for stuff_dict in config['stuff_to_extract']:
        dag_task = TriggerDagRunOperator(
            task_id='trigger-extraction-dag',
            trigger_dag_id=EXTRACTION_DAG_NAME,
            conf=stuff_dict
        )
        dag_task.execute(dict())

Upvotes: 1

Alan Ma
Alan Ma

Reputation: 591

You can wrap your PythonOperator instantiation in a for loop that consumes the list of values.

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

dag = DAG(
    dag_id='fruit_name_printer',
    start_date=datetime(2021, 1, 1),
    schedule_interval='@once'
)

input = [
    'apple',
    'orange',
    'banana'
]


def call_func(fruit_name):
    print(fruit_name)


with dag:
    for fruit in input:
        printer = PythonOperator(
            task_id=f'print_{fruit}',
            python_callable=call_func,
            op_kwargs={
                'fruit_name': fruit
            }
        )

Upvotes: 0

Related Questions