n1r44
n1r44

Reputation: 581

Macros in the Airflow Python operator

Can I use macros with the PythonOperator? I tried following, but I was unable to get the macros rendered:

dag = DAG(
    'temp',
    default_args=default_args,
    description='temp dag',
    schedule_interval=timedelta(days=1))

def temp_def(a, b, **kwargs):
    print '{{ds}}'
    print '{{execution_date}}'
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = PythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)

Upvotes: 24

Views: 17601

Answers (2)

cr0atIAN
cr0atIAN

Reputation: 979

In my opinion a more native Airflow way of approaching this would be to use the included PythonOperator and use the provide_context=True parameter as such.

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    provide_context=True,
    dag=dag)

Now you have access to all of the macros, airflow metadata and task parameters in the kwargs of your callable

def temp_def(**kwargs):
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))

If you had some custom defined params associated with the task you could access those as well via kwargs['params']

Upvotes: 31

jhnclvr
jhnclvr

Reputation: 9487

Macros only get processed for templated fields. To get Jinja to process this field, extend the PythonOperator with your own.

class MyPythonOperator(PythonOperator):
    template_fields = ('templates_dict','op_args')

I added 'templates_dict' to the template_fields because the PythonOperator itself has this field templated: PythonOperator

Now you should be able to use a macro within that field:

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)

Upvotes: 29

Related Questions