Reputation: 188
There are three pools low
, medium
and high
and are configured with a slot of 1, 2 and 3.
Now, task1
set xcom value of pool
based on some calculation and task2
needs be scheduled in that pool.
Sample code -
dag = DAG("mongo-connection-test")
def test(**kwargs):
# some condition to set pool value
pool = "high"
kwargs['ti'].xcom_push(key="pool", value=pool)
task1 = PythonOperator(task_id="set_xcom",
python_callable=test,
xcom_push=True,
provide_context=True,
dag=dag)
task2 = BashOperator(
task_id="test",
bash_command="echo Hello !!",
dag=dag,
pool='{{ ti.xcom_pull(task_ids="set_xcom", key="pool") }}',
provide_context=True)
task1 >> task2
But task2 is unable to pull value from xcom and scheduler will fail to execute saying no poolexist '{{ ti.xcom_pull(task_ids="test", key="pool") }}'
.
Need help in fixing this?
Upvotes: 3
Views: 561
Reputation: 765
I recommend you read the documentation about Jinja Templating in Airflow.
If you want to use it, try to look at the template_fields
in the source code first (e.g. BashOperator)
Since pool
is not in that list, what you are trying to do won't work.
Therefore, I would recommend you to create your task 3 times with the 3 different pool settings and use a BranchPythonOperator
to select which of the 3 tasks should run (you can pull from XCom in the BranchPythonOperator
)
Your DAG should probably look like this:
task1 >> branch_task >> [task2_low, task2_medium, task2_high]
Upvotes: 1