harinsa
harinsa

Reputation: 3196

How to parse json string in airflow template

Is it possible to parse JSON string inside an airflow template?

I have a HttpSensor which monitors a job via a REST API, but the job id is in the response of the upstream task which has xcom_push marked True.

I would like to do something like the following, however, this code gives the error jinja2.exceptions.UndefinedError: 'json' is undefined

t1 = SimpleHttpOperator(
    http_conn_id="s1",
    task_id="job",
    endpoint="some_url",
    method='POST',
    data=json.dumps({ "foo": "bar" }),
    xcom_push=True,
    dag=dag,
)

t2 = HttpSensor(
    http_conn_id="s1",
    task_id="finish_job",
    endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
    response_check=lambda response: True if response.json().state == "complete" else False,
    poke_interval=5,
    dag=dag
)

t2.set_upstream(t1)

Upvotes: 9

Views: 13753

Answers (3)

Akash Sharma
Akash Sharma

Reputation: 136

There is another new method to achieve the above by setting render_template_as_native_obj=True at dag level

dag = DAG(
    'dagname',
    default_args=default_args,
    schedule_interval="@once",
    render_template_as_native_obj=True   # <------
)

t1 = SimpleHttpOperator(
    task_id='job',
    xcom_push=True,
    ...
)

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job")["jobId"] }}',  # <-----
    ...
)

Here's the reference from Airflow docs!

Upvotes: 2

harinsa
harinsa

Reputation: 3196

Alternatively, it is also possible to add the json module to the template by doing and the json will be available for usage inside the template. However, it is probably a better idea to create a plugin like Daniel said.

dag = DAG(
    'dagname',
    default_args=default_args,
    schedule_interval="@once",
    user_defined_macros={
        'json': json
    }
)

then

finish_job = HttpSensor(
    task_id="finish_job",
    endpoint="kue/job/{{ json.loads(ti.xcom_pull('job'))['jobId'] }}",
    response_check=lambda response: True if response.json()['state'] == "complete" else False,
    poke_interval=5,
    dag=dag
)

Upvotes: 10

Daniel Huang
Daniel Huang

Reputation: 6548

You can add a custom Jinja filter to your DAG with the parameter user_defined_filters to parse the json.

a dictionary of filters that will be exposed in your jinja templates. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to {{ 'world' | hello }} in all jinja templates related to this DAG.

dag = DAG(
    ...
    user_defined_filters={'fromjson': lambda s: json.loads(s)},
)

t1 = SimpleHttpOperator(
    task_id='job',
    xcom_push=True,
    ...
)

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
    ...
)

However, it may be cleaner to just write your own custom JsonHttpOperator plugin (or add a flag to SimpleHttpOperator) that parses the JSON before returning so that you can just directly reference {{ti.xcom_pull("job")["jobId"] in the template.

class JsonHttpOperator(SimpleHttpOperator):

    def execute(self, context):
        text = super(JsonHttpOperator, self).execute(context)
        return json.loads(text)

Upvotes: 18

Related Questions