trench
trench

Reputation: 5355

Airflow ETL pipeline - using schedule date in functions?

Is it possible to refer to the default_args start_date in your Python function?

default_args = {
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2016, 11, 21),
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': True,
    'retries': 1, 
    'retry_delay': timedelta(minutes=1)
}

My python script primarily uses subprocess to issue this statement:

query = '"SELECT * FROM {}.dbo.{} WHERE row_date = \'{}\'"'.format(database,                                                             select_database(database)[table_int],
                                                                       query_date)
command = 'BCP {} queryout \"{}\" -t, -c -a 10240 -S "server" -T'.format(query, os.path.join(path, filename))

The task I want to run is using BCP to query 'Select * from table where date = {}'. Currently, my python script has all of the logic for the date variable (defaults to yesterday). However, it would be nice to refer to the default_arg instead and have airflow handle the dates.

So, to simplify, I want to use the default_arg start_date and schedule (runs each day) to fill in the variable on my BCP command. Is this the right approach or should I keep the date logic in the python script?

Upvotes: 4

Views: 2429

Answers (1)

Dmitri Safine
Dmitri Safine

Reputation: 843

This is the right approach, but what you really need is execution_date, not start_date. You can get execution_date as 'ds' default variable through the context in PythonOperator using provide_context=True parameter. The provide_context=True parameter passes a set of default variables used in Jinja Templating through kwargs argument. You can read more about Default Variables and Jinja Templating in the relevant sections of the documentation. https://airflow.incubator.apache.org/code.html#default-variables https://airflow.incubator.apache.org/concepts.html#jinja-templating

Your code should look like the following:

def query_db(**kwargs):
    #get execution date in format YYYY-MM-DD
    query_date = kwargs.get('ds') 

    #rest of your logic

t_query_db = PythonOperator( task_id='query_db', python_callable=query_db, provide_context=True, dag=dag)

Upvotes: 5

Related Questions