Reputation: 5355
Is it possible to refer to the default_args start_date in your Python function?
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 21),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
My python script primarily uses subprocess to issue this statement:
query = '"SELECT * FROM {}.dbo.{} WHERE row_date = \'{}\'"'.format(database, select_database(database)[table_int],
query_date)
command = 'BCP {} queryout \"{}\" -t, -c -a 10240 -S "server" -T'.format(query, os.path.join(path, filename))
The task I want to run is using BCP to query 'Select * from table where date = {}'. Currently, my python script has all of the logic for the date variable (defaults to yesterday). However, it would be nice to refer to the default_arg instead and have airflow handle the dates.
So, to simplify, I want to use the default_arg start_date and schedule (runs each day) to fill in the variable on my BCP command. Is this the right approach or should I keep the date logic in the python script?
Upvotes: 4
Views: 2429
Reputation: 843
This is the right approach, but what you really need is execution_date
, not start_date
. You can get execution_date
as 'ds'
default variable through the context in PythonOperator using provide_context=True
parameter. The provide_context=True
parameter passes a set of default variables used in Jinja Templating through kwargs
argument. You can read more about Default Variables and Jinja Templating in the relevant sections of the documentation. https://airflow.incubator.apache.org/code.html#default-variables
https://airflow.incubator.apache.org/concepts.html#jinja-templating
Your code should look like the following:
def query_db(**kwargs):
#get execution date in format YYYY-MM-DD
query_date = kwargs.get('ds')
#rest of your logic
t_query_db = PythonOperator(
task_id='query_db',
python_callable=query_db,
provide_context=True,
dag=dag)
Upvotes: 5