Reputation: 651
Is there a way to pass a command line argument to Airflow BashOperator. Currently, I have a python script that accepts a date argument and performs some specific activities like cleaning up specific folders older than given date.
In simplified code with just one task, what I would like to do is
from __future__ import print_function
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
default_args = {
'owner' : 'airflow'
,'depends_on_past' : False
,'start_date' : datetime(2017, 01, 18)
,'email' : ['[email protected]']
,'retries' : 1
,'retry_delay' : timedelta(minutes=5)
}
dag = DAG(
dag_id='data_dir_cleanup'
,default_args=default_args
,schedule_interval='0 13 * * *'
,dagrun_timeout=timedelta(minutes=10)
)
cleanup_task = BashOperator(
task_id='task_1_data_file_cleanup'
,bash_command='python cleanup.py --date $DATE 2>&1 >> /tmp/airflow/data_dir_cleanup.log'
#--------------------------------------^^^^^^-- (DATE variable which would have been given on command line)
#,env=env
,dag=dag
)
Thanks in advance,
Upvotes: 11
Views: 39930
Reputation: 750
The BashOperator is templated with Jinja2, meaning that you can pass arbitrary values. In your case it would be something like:
cleanup_task = BashOperator(
task_id='task_1_data_file_cleanup'
,bash_command="python cleanup.py --date {{ params.DATE }} 2>&1 >> /tmp/airflow/data_dir_cleanup.log"
,params = {'DATE' : 'this-should-be-a-date'}
,dag=dag
)
See also: https://airflow.incubator.apache.org/tutorial.html#templating-with-jinja for a broader example.
Upvotes: 21
Reputation: 51
You can try the following (worked for me):
cmd_command = "python path_to_task/[task_name.py] '{{ execution_date }}' '{{ prev_execution_date }}'"
t = BashOperator(
task_id = 'some_id',
bash_command = cmd_command,
dag = your_dag_object_name)
When I did so, it rendered the variables, and it worked well. I believe it work for all variables (notice that i've put the word 'python' in the start of my command because I want to run a .py script.
My task is written properly in order to read those variables as command line arguments (sys.argv attribute).
Upvotes: 5
Reputation: 211
BashOperator is Jinja templated, so params can be passed as dictionary.
Airflow will schedule the task and does not prompt you for param, so when you said "need to pass specific date as command line param" that's not possible. Though Airflow has a notion of EXECUTION DATE, which is the date on which dag is scheduled to run and that can be passed in BashOperator params using macro {{ ds }} or {{ ds_nodash }} (https://airflow.incubator.apache.org/code.html#macros)
env = {}
env['DATE'] = '{{ ds }}'
cleanup_task = BashOperator(
task_id='task_1_data_file_cleanup'
,bash_command='python cleanup.py --date $DATE 2>&1 >> /tmp/airflow/data_dir_cleanup.log'
,params=env
,dag=dag
)
That "DATE" param will be passed to bash script and can be used as any other bash variable with $DATE
Upvotes: 1