Arya
Arya

Reputation: 531

jinja2.exceptions.TemplateSyntaxError: unexpected 'end of template' - how to pass array values run time to a variable in dag

we are trying to pass array ['/home/arya/'] under params as below and while passing the arrays we are facing jinja2.exceptions.TemplateSyntaxError: unexpected 'end of template'

and for integers as well we are facing issue.

with DAG(
    dag_id="INSURANCE_CALC",
    start_date= datetime(2022, 1, 24),
    schedule_interval=None,
    default_args=default_args,
    params= {
        "param2": "[email protected]",
        "sourcedir": ['/home/arya/'],
        "timenum": 0
    },
    catchup=False
) as dag


for number in range(0,len('{{ params.sourcedir}}'))):
    srcdir.append(("abc" + '{{ params.sourcedir}}'[number] + "xyz"))
    taskinfo = Operator(   task_id="taskinfo",  sourcedir=srcdir[task_no] )

reading above params as '{{ params.sourcedir}}' and passing them as a variable to the tasks.

An suggestion how to handle this ?

Upvotes: 0

Views: 580

Answers (1)

ozs
ozs

Reputation: 3661

  1. range should get an integer number not a list.

  2. by default each param type is string. if you want to get it as its native type, you should add render_template_as_native_obj=True to you DAG definition.

  3. Because sourcedir is param and can be changed during execution, you need to use Dynamic Tasks (Airflow>=2.3.0) or creates a task the executes tasks.

Thats an example (without Dynamic Tasks):

with DAG(
        dag_id="test_dag",
        start_date=datetime(2022, 1, 24),
        schedule_interval=None,
        render_template_as_native_obj=True,
        default_args={},
        params={
            "param2": "[email protected]",
            "sourcedir": ['/home/arya/'],
            "timenum": 0
        },
        catchup=False
) as dag:

    @task
    def make_list(lst):
        context = get_current_context()
        srcdir = []
        for number in range(0, len(lst)):
            srcdir.append(("abc" + lst[number] + "xyz"))
            taskinfo = EmptyOperator(task_id=f"taskinfo_{number}")
            taskinfo.execute(context)
        return srcdir

    make_list("{{ params.sourcedir }}")

Upvotes: 1

Related Questions