Reputation: 513
I can't figure out how to dynamically create tasks in airflow at schedule time. My Dag is created prior to the knowledge of how many tasks are required at run-time. I.e., On each dag trigger, i would like to pass the directory to be processed to create a list of tasks for the following Dag.
I couldn't come up with anything so far
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
dir = '/home/uname/dir'
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2
Here, how should i pass 'dir' variable while triggering the Dag so that task1 and task2 will run based on number of files present in the 'dir'.
Upvotes: 1
Views: 1609
Reputation: 18844
You can use Airflow Variables or Environment variables.
# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")
# Using Env Vars
import os
dir1= os.environ["dir1"]
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2
Upvotes: 1