Xi12
Xi12

Reputation: 1223

What is task_instance.xcom_pull in AIrflow?

I am trying to run EMR through Airflow and found example where it says

 step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=SPARK_STEPS,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
        aws_conn_id='aws_default',
    )

what is job_flow_id ={{ task_instance.xcom_pull('create_job_flow', key='return_value') }}

what does this tell me?

Thanks, Xi

Upvotes: 3

Views: 10369

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16079

In Airflow tasks can not share data however they can share metadata. This is done by a task writing record to the Xcom table in the database while other task read it.

task_instance.xcom_pull('create_job_flow', key='return_value')) means:

  1. Go to the Xcom table
  2. find the row matched to this DagRun and task_id='create_job_flow'
  3. return the entry saved under key='return_value'

The {{ }} is syntax of Jinja engine that means "print" the value. This is needed since the value that you are seeking exist only during run time. In terms that create_job_flow task must run and save the value to the database before add_steps task can read the value. In practice this means that the create_job_flow task is creating EMR instance and saving the instance/machine id to the Xcom table. The next task is add_steps which means that you want to submit steps to the machine - for that you need the machine id thus you must read (pull) the value from the Xcom table. The value will be different per DagRun as each DagRun creates a new machine.

Upvotes: 4

Related Questions