amadorschulze92
amadorschulze92

Reputation: 136

Pulling xcom from sub dag

I am using a main dag (main_dag) that contains a number of subdags and each of those subdags has a number of tasks. I pushed an xcom from subdagA taskA, but I am pulling that xcom within subdagB taskB. Since the dag_id argument in xcom_pull() defaults to self.dag_id I have been unable to pull the necessary xcom. I was wondering how one would do this and/or if there is a better way to set this scenario up so I don't have to deal with this.

example of what I am currently doing in subdagB:

def subdagB(parent_dag, child_dag, start_date, schedule_interval):
    subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval)
    start = DummyOperator(
        task_id='taskA',
        dag=subdagB)
    tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};'''
    t1 = BashOperator(
        task_id='taskB',
        bash_command=tag_db_template,
        xcom_push=True,
        dag=subdagB)
    end = DummyOperator(
        task_id='taskC',
        dag=subdagB)
    t0.set_upstream(start)
    t1.set_upstream(t0)
    end.set_upstream(t1)
    return subdagB

Thank you in advance for any help!

Upvotes: 10

Views: 7065

Answers (1)

gnicholas
gnicholas

Reputation: 2087

You should be fine as long as you override the dag_id in

  • [Operator].xcom_pull(dag_id=dag_id, ...) or
  • [TaskInstance].xcom_pull(dag_id=dag_id, ...)

Just make sure that

dag_id = "{parent_dag_id}.{child_dag_id}"

If you can make your example more complete I can try running it locally, but I tested a (similar) example and cross-subdag xcoms work as expected.

Upvotes: 17

Related Questions