qcha
qcha

Reputation: 583

How to deserialize Xcom strings in Airflow?

Consider a DAG containing two tasks: DAG: Task A >> Task B (BashOperators or DockerOperators). They need to communicate through XComs.

For example we can deserialize Airflow Variables in jinja templates (ex: {{ var.json.my_var.path }}). Globally I would like to do the same thing with XComs.

Edit: a workaround is to convert the json string into a python dictionary before sending it to Xcom (see below).

Upvotes: 1

Views: 3536

Answers (2)

ozs
ozs

Reputation: 3681

You can add a post function to the BashOperator that deserialize the result and push all keys separately

def _post(context, result):
    ti = context["ti"]
    output = json.loads(result)
    for key, value in output.items():
        ti.xcom_push(key, value)

BashOperator(
    task_id="task_id",
    bash_command='bash command',
    post_execute=_post
)

Upvotes: 1

qcha
qcha

Reputation: 583

A workaround is to create a custom Operator (inherited from BashOperator or DockerOperator) and augment the execute method:

  1. execute the original execute method
  2. intercepts the last log line of the task
  3. tries to json.loads() it in a Python dictionnary
  4. finally return the output (which is now a dictionnary, not a string)

The previous jinja template {{ xcom_pull('task_a')['key2'] }} is now working in task B, since the XCom value is now a Python dictionnary.

class BashOperatorExtended(BashOperator):
    def execute(self, context):
        output = BashOperator.execute(self, context)
        try: 
            output = json.loads(output)
        except:
            pass
        return output

class DockerOperatorExtended(DockerOperator):
    def execute(self, context):
        output = DockerOperator.execute(self, context)
        try: 
            output = json.loads(output)
        except:
            pass
        return output

But creating a new operator just for that purpose is not really satisfying..

Upvotes: 2

Related Questions