David Sánchez
David Sánchez

Reputation: 606

Airflow - How to get XCOM from dynamic task

I am creating dynamic tasks in airflow, and I need to access the xcom from those dynamic tasks.

The tasks are generated dynamically, depending on the "topic", see code below.

for topic in ["topic1", "topic2"]:

    deduplicate_insert_master_job = DLISparkCreateBatchJobOperator(
        task_id="deduplicate_insert_master_job_{}".format(topic),
        ...
    )


    deduplicate_insert_master_sensor = DLISparkShowBatchStateSensor(
        task_id="deduplicate_insert_master_sensor_{}".format(topic),
        job_id="{{ti.xcom_pull(task_ids='deduplicate_insert_master_job_{}')['id']}}".format(topic),
    )

In deduplicate_insert_master_sensor task, I need to access to the name, with topic of deduplicate_insert_master_job .

How can I do?

【Have tried so far】

  1. Airflow - How to pass xcom variable into Python function
  2. Apache airflow xcom for variable task id

Thanks!

【Possible Answer】 Airflow Python Operator - Get values from xcom pull for params

"{{{{ ti.xcom_pull(task_ids='push_result_{}') }}}}".format(table_name)
"{{ ti.xcom_pull(task_ids='push_result_%s') }}" % "TABLENAME"

In my specific case, this works:

    deduplicate_insert_master_sensor = DLISparkShowBatchStateSensor(
        task_id="deduplicate_insert_master_sensor_{}".format(topic),
        job_id = "{{ ti.xcom_pull(task_ids='deduplicate_insert_master_job_%s')['id'] }}" % topic
    )

Upvotes: 0

Views: 116

Answers (0)

Related Questions