Reputation: 583
Consider a DAG containing two tasks: DAG: Task A >> Task B
(BashOperators or DockerOperators). They need to communicate through XComs.
Task A
outputs the informations through a one-line json in stdout, which can then be retrieve in the logs of Task A
, and so in its return_value XCom key if xcom_push=True
. For instance : {"key1":1,"key2":3}
Task B
only needs the key2
information from Task A
, so we need to deserialize the return_value XCom of Task A
to extract only this value and pass it directly to Task B
, using the jinja template {{xcom_pull('task_a')['key2']}}
. Using it as this results in jinja2.exceptions.UndefinedError: 'str object' has no attribute 'key2'
because return_value is just a string.
For example we can deserialize Airflow Variables in jinja templates (ex: {{ var.json.my_var.path }}
). Globally I would like to do the same thing with XComs.
Edit: a workaround is to convert the json string into a python dictionary before sending it to Xcom (see below).
Upvotes: 1
Views: 3536
Reputation: 3681
You can add a post function to the BashOperator that deserialize the result and push all keys separately
def _post(context, result):
ti = context["ti"]
output = json.loads(result)
for key, value in output.items():
ti.xcom_push(key, value)
BashOperator(
task_id="task_id",
bash_command='bash command',
post_execute=_post
)
Upvotes: 1
Reputation: 583
A workaround is to create a custom Operator (inherited from BashOperator or DockerOperator) and augment the execute
method:
execute
methodjson.loads()
it in a Python dictionnaryThe previous jinja template {{ xcom_pull('task_a')['key2'] }}
is now working in task B
, since the XCom value is now a Python dictionnary.
class BashOperatorExtended(BashOperator):
def execute(self, context):
output = BashOperator.execute(self, context)
try:
output = json.loads(output)
except:
pass
return output
class DockerOperatorExtended(DockerOperator):
def execute(self, context):
output = DockerOperator.execute(self, context)
try:
output = json.loads(output)
except:
pass
return output
But creating a new operator just for that purpose is not really satisfying..
Upvotes: 2