Reputation: 1223
I am trying to run EMR through Airflow and found example where it says
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
what is job_flow_id ={{ task_instance.xcom_pull('create_job_flow', key='return_value') }}
what does this tell me?
Thanks, Xi
Upvotes: 3
Views: 10369
Reputation: 16079
In Airflow tasks can not share data however they can share metadata. This is done by a task writing record to the Xcom table in the database while other task read it.
task_instance.xcom_pull('create_job_flow', key='return_value'))
means:
task_id='create_job_flow'
key='return_value'
The {{ }}
is syntax of Jinja engine that means "print" the value. This is needed since the value that you are seeking exist only during run time. In terms that create_job_flow
task must run and save the value to the database before add_steps
task can read the value.
In practice this means that the create_job_flow
task is creating EMR instance and saving the instance/machine id to the Xcom table. The next task is add_steps
which means that you want to submit steps to the machine - for that you need the machine id thus you must read (pull) the value from the Xcom table. The value will be different per DagRun as each DagRun creates a new machine.
Upvotes: 4