Sam
Sam

Reputation: 747

Airflow is taking jinja template as string

in Airflow im trying to us jinja template in airflow but the problem is it is not getting parsed and rather treated as a string . Please see my code ``

from datetime import datetime

from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

def test_method(dag,network_id,schema_name):
    print "Schema_name in test_method", schema_name
    third_task = PythonOperator(
        task_id='first_task_' + network_id,
        provide_context=True,
        python_callable=print_context2,
        dag=dag)
    return third_task

dag = DAG('testing_xcoms_pull', description='Testing Xcoms',
          schedule_interval='0 12 * * *',
          start_date= datetime.today(),
          catchup=False)


def print_context(ds, **kwargs):
    return 'Returning from print_context'

def print_context2(ds, **kwargs):
    return 'Returning from print_context2'

def get_schema(ds, **kwargs):
    # Returning schema name based on network_id
    schema_name = "my_schema"
    return get_schema

first_task = PythonOperator(
    task_id='first_task',
    provide_context=True,
    python_callable=print_context,
    dag=dag)


second_task = PythonOperator(
    task_id='second_task',
    provide_context=True,
    python_callable=get_schema,
    dag=dag)

network_id = '{{ dag_run.conf["network_id"]}}'

first_task >> second_task >> test_method(
                    dag=dag,
                    network_id=network_id,
                    schema_name='{{ ti.xcom_pull("second_task")}}')

``

The Dag creation is failing because '{{ dag_run.conf["network_id"]}}' is taken as string by airflow. Can anyone help me with the problem in my code ???

Upvotes: 5

Views: 7990

Answers (2)

joebeeson
joebeeson

Reputation: 4366

A DAG object, and its definition code, isn't parsed within the context an execution, it's parsed with regards to the environment available to it when loaded by Python.

The network_id variable, which you use to define the task_id in your function, isn't templated prior to execution, it can't be since there is no execution active. Even with templating you still need a valid, static, non-templated task_id value to instantiate a DAG object.

Upvotes: 0

trejas
trejas

Reputation: 1031

Airflow operators have a variable called template_fields. This variable is usually declared at the top of the operator Class, check out any of the operators in the github code base.

If the field you are trying to pass Jinja template syntax into is not in the template_fields list the jinja syntax will appear as a string.

Upvotes: 3

Related Questions