Reputation: 2953
I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.
I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'
This is what my pipeline looks like(very simplified version):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine( - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
current_datetime =
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
If you notice I am doing current_datetime=
Instead what I want here is 'execution_date'
How do I use 'execution_date' directly and assign it to a variable in my python file?
I have having this general issue of accessing args. Any help will be genuinely appreciated.
Upvotes: 90
Views: 185302
Reputation: 139
Here's another way without context. using the dag's last execution time can be very helpful in scheduled ETL jobs. Such as a dag that 'downloads all newly added files'. Instead of hardcoding a datetime.datetime, use the dag's last execution date as your time filter.
Airflow Dags actually have a class called DagRun that can be accessed like so: dag_runs = DagRun.find(dag_id=dag_id)
Here's an easy way to get the most recent run's execution time:
def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[1] if len(dag_runs) > 1 else None
Then, within your pythonOperator, you can dynamically access the dag's last execution by calling the function you created above:
last_execution = get_most_recent_dag_run('dag')
Now its a variable!
Upvotes: -1
Reputation: 131
To print execution date inside the callable function of your PythonOperator
you can use the following in your Airflow Script and also can add start_time
and end_time
as follows:
def python_func(**kwargs):
execution_date = kwargs["execution_date"] #<datetime> type with timezone
end_time = str(execution_date)
start_time = str(execution_date.add(minutes=-30))
I have converted the datetime value to string as I need to pass it in a SQL Query. We can use it otherwise also.
Upvotes: 9
Reputation: 666
The BashOperator
's bash_command
argument is a template. You can access execution_date
in any template as a datetime
object using the execution_date
variable. In the template, you can use any jinja2
methods to manipulate it.
Using the following as your BashOperator
# pass in the first of the current month {{ execution_date.replace(day=1) }}
# last day of previous month {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
If you just want the string equivalent of the execution date, ds
will return a datestamp (YYYY-MM-DD), ds_nodash
returns same without dashes (YYYYMMDD), etc. More on macros
is available in the Api Docs.
Your final operator would look like:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
Upvotes: 65
Reputation: 61
You may consider SimpleHttpOperator It’s so simple for making http request. you can pass execution_date with endpoint parameter via template.
Upvotes: 0
Reputation: 2193
I think you can't assign variables with values from the airflow context outside of a task instance, they are only available at run-time. Basically there are 2 different steps when a dag is loaded and executed in airflow :
First your dag file is interpreted and parsed. It has to work and compile and the task definitions must be correct (no syntax error or anything). During this step, if you make function calls to fill some values, these functions won't be able to access airflow context (the execution date for example, even more if you're doing some backfilling).
The second step is the execution of the dag. It's only during this second step that the variables provided by airflow (execution_date, ds, etc...
) are available as they are related to an execution of the dag.
So you can't initialize global variables using the Airflow context, however, Airflow gives you multiple mechanisms to achieve the same effect :
Using jinja template in your command (it can be in a string in the code or in a file, both will be processed). You have the list of available templates here : Note that some functions are also available, particularly for computing days delta and date formatting.
Using a PythonOperator in which you pass the context (with the provide_context
argument). This will allow you to access the same template with the syntax kwargs['<variable_name']
. If you need so, you can return a value from a PythonOperator, this one will be stored in an XCOM variable you can use later in any template. Access to XCOM variables use this syntax :
If you write your own operator, you can access airflow variables with the dict context
Upvotes: 35
Reputation: 1291
def execute(self, context):
execution_date = context.get("execution_date")
This should be inside the execute() method of Operator
Upvotes: 23
Reputation: 2908
The PythonOperator constructor takes a 'provide_context' parameter (see If it's True, then it passes a number of parameters into the python_callable via kwargs. kwargs['execution_date'] is what you want, I believe.
Something like this:
def python_method(ds, **kwargs):
Variable.set('execution_date', kwargs['execution_date'])
doit = PythonOperator(
I'm not sure how to do it with the BashOperator, but you might start with this issue:
Upvotes: 49