Reputation: 3196
Is it possible to parse JSON string inside an airflow template?
I have a HttpSensor which monitors a job via a REST API, but the job id is in the response of the upstream task which has xcom_push
marked True
.
I would like to do something like the following, however, this code gives the error jinja2.exceptions.UndefinedError: 'json' is undefined
t1 = SimpleHttpOperator(
http_conn_id="s1",
task_id="job",
endpoint="some_url",
method='POST',
data=json.dumps({ "foo": "bar" }),
xcom_push=True,
dag=dag,
)
t2 = HttpSensor(
http_conn_id="s1",
task_id="finish_job",
endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
response_check=lambda response: True if response.json().state == "complete" else False,
poke_interval=5,
dag=dag
)
t2.set_upstream(t1)
Upvotes: 9
Views: 13753
Reputation: 136
There is another new method to achieve the above by setting render_template_as_native_obj=True
at dag level
dag = DAG(
'dagname',
default_args=default_args,
schedule_interval="@once",
render_template_as_native_obj=True # <------
)
t1 = SimpleHttpOperator(
task_id='job',
xcom_push=True,
...
)
t2 = HttpSensor(
endpoint='job/{{ (ti.xcom_pull("job")["jobId"] }}', # <-----
...
)
Here's the reference from Airflow docs!
Upvotes: 2
Reputation: 3196
Alternatively, it is also possible to add the json
module to the template by doing and the json will be available for usage inside the template. However, it is probably a better idea to create a plugin like Daniel said.
dag = DAG(
'dagname',
default_args=default_args,
schedule_interval="@once",
user_defined_macros={
'json': json
}
)
then
finish_job = HttpSensor(
task_id="finish_job",
endpoint="kue/job/{{ json.loads(ti.xcom_pull('job'))['jobId'] }}",
response_check=lambda response: True if response.json()['state'] == "complete" else False,
poke_interval=5,
dag=dag
)
Upvotes: 10
Reputation: 6548
You can add a custom Jinja filter to your DAG with the parameter user_defined_filters to parse the json.
a dictionary of filters that will be exposed in your jinja templates. For example, passing
dict(hello=lambda name: 'Hello %s' % name)
to this argument allows you to{{ 'world' | hello }}
in all jinja templates related to this DAG.
dag = DAG(
...
user_defined_filters={'fromjson': lambda s: json.loads(s)},
)
t1 = SimpleHttpOperator(
task_id='job',
xcom_push=True,
...
)
t2 = HttpSensor(
endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
...
)
However, it may be cleaner to just write your own custom JsonHttpOperator
plugin (or add a flag to SimpleHttpOperator
) that parses the JSON before returning so that you can just directly reference {{ti.xcom_pull("job")["jobId"]
in the template.
class JsonHttpOperator(SimpleHttpOperator):
def execute(self, context):
text = super(JsonHttpOperator, self).execute(context)
return json.loads(text)
Upvotes: 18