Reputation: 606
I am creating dynamic tasks in airflow, and I need to access the xcom from those dynamic tasks.
The tasks are generated dynamically, depending on the "topic", see code below.
for topic in ["topic1", "topic2"]:
deduplicate_insert_master_job = DLISparkCreateBatchJobOperator(
task_id="deduplicate_insert_master_job_{}".format(topic),
...
)
deduplicate_insert_master_sensor = DLISparkShowBatchStateSensor(
task_id="deduplicate_insert_master_sensor_{}".format(topic),
job_id="{{ti.xcom_pull(task_ids='deduplicate_insert_master_job_{}')['id']}}".format(topic),
)
In deduplicate_insert_master_sensor task, I need to access to the name, with topic of deduplicate_insert_master_job .
How can I do?
【Have tried so far】
Thanks!
【Possible Answer】 Airflow Python Operator - Get values from xcom pull for params
"{{{{ ti.xcom_pull(task_ids='push_result_{}') }}}}".format(table_name)
"{{ ti.xcom_pull(task_ids='push_result_%s') }}" % "TABLENAME"
In my specific case, this works:
deduplicate_insert_master_sensor = DLISparkShowBatchStateSensor(
task_id="deduplicate_insert_master_sensor_{}".format(topic),
job_id = "{{ ti.xcom_pull(task_ids='deduplicate_insert_master_job_%s')['id'] }}" % topic
)
Upvotes: 0
Views: 116