SpaceyBot
SpaceyBot

Reputation: 513

How to use Airflow variables to run more than one Job for a DAG dynamically

I know how to run DAG's tasks dynamically using Variable and it works pretty well until you trigger multiple runs for the same DAG.

i.e., Once a new directory with files are created under data/to/load/ dir at some point, I have a script written somewhere which will trigger airflow variables -set dir data/to/load/$newDir followed by airflow trigger_dag dyn_test. Now let's say directory "a" and "b" is created(at similar time) under data/to/load/ which will make airflow variable + airflow trigger_dag calls twice with two different inputs on variable set call(one suffixed with 'a' and the other with 'b' ofcourse). And i see two Jobs running for the DAG in airflow GUI but the problem is that they are both considering same directory value, a or b. This could definitely mean that it takes the final 'airflow variable set' call. How do i resolve it?. What is the way to trigger multiple runs each taking different values(in dir variable) to loop dynamically. My Dag looks something like this:

# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")


args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2

Upvotes: 1

Views: 2623

Answers (1)

Oluwafemi Sule
Oluwafemi Sule

Reputation: 38992

The scenario described in your question is one where a first-in first-out queue is fitting that is assuming you want to keep the current way of explicitly setting the directory to be processed as a separate sequence.

That said, Airflow CLI trigger_dags command allows to pass the --conf flag for setting the configuration dictionary passed in the DagRun and I'll go that way as you have described that where the variable is set, there is the dag triggered.

http://airflow.apache.org/cli.html#trigger_dag

Here is how that will possibly look in code.

airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'

You will set the provide_context kwargs in what airflow operator you use for the tasks.

The instance of the DagRun can retrieved in the context and the dir value set in the configuration retrieved

Say that you defined your tasks with Airflow PythonOperator; then your code to retrieved dir in the python_callable will look similar to this:

def me_seeks(dag_run=None):
    dir = dag_run.conf['me_seeks.dir']

Upvotes: 1

Related Questions