Amrit Jangid
Amrit Jangid

Reputation: 188

airflow pool value from xcom at runtime

There are three pools low, medium and high and are configured with a slot of 1, 2 and 3.

Now, task1 set xcom value of pool based on some calculation and task2 needs be scheduled in that pool.

Sample code -

dag = DAG("mongo-connection-test")

def test(**kwargs):
    # some condition to set pool value
    pool = "high" 
    kwargs['ti'].xcom_push(key="pool", value=pool)

task1 = PythonOperator(task_id="set_xcom",
                       python_callable=test,
                       xcom_push=True, 
                       provide_context=True,
                       dag=dag)

task2 = BashOperator(
                   task_id="test",
                   bash_command="echo Hello !!",
                   dag=dag,
                   pool='{{ ti.xcom_pull(task_ids="set_xcom", key="pool") }}',
                   provide_context=True)
task1 >> task2

But task2 is unable to pull value from xcom and scheduler will fail to execute saying no poolexist '{{ ti.xcom_pull(task_ids="test", key="pool") }}'. Need help in fixing this?

Upvotes: 3

Views: 561

Answers (1)

Christopher Beck
Christopher Beck

Reputation: 765

I recommend you read the documentation about Jinja Templating in Airflow. If you want to use it, try to look at the template_fields in the source code first (e.g. BashOperator)

Since pool is not in that list, what you are trying to do won't work.

Therefore, I would recommend you to create your task 3 times with the 3 different pool settings and use a BranchPythonOperator to select which of the 3 tasks should run (you can pull from XCom in the BranchPythonOperator)

Your DAG should probably look like this:

task1 >> branch_task >> [task2_low, task2_medium, task2_high]

Upvotes: 1

Related Questions