notacorn
notacorn

Reputation: 4119

How do you push an xcom variable to an existing dag id?

I currently have a DAG in Airflow with a Python Operator and associated python callable like such:

def push_xcom(**kwargs):
    ti = kwargs["ti"]

    ti.xcom_push(key=key, value=value)

xcom_opr = PythonOperator(
        task_id='xcom_opr',
        python_callable=push_xcom,
        dag=dag
    )

The goal of this dag is to update other DAG's xcom variables defined in Airflow. Is this not possible? I couldn't find any source code for xcom_push, but maybe something like a dag_id argument?

Upvotes: 2

Views: 974

Answers (1)

Liam Clarke
Liam Clarke

Reputation: 385

Looking at the source code for TaskInstance it looks like you could copy what it does under the hood directly, and specify your desired DAG id.

        XCom.set(
            key=key,
            value=value,
            task_id=self.task_id,
            dag_id=self.dag_id,
            execution_date=execution_date or self.execution_date)

However, the xcom_pull API directly supports pulling from another DAG's xcom so perhaps you could have the DAG you want to modify pull from the other instead?

    def xcom_pull(
            self,
            task_ids: Optional[Union[str, Iterable[str]]] = None,
            dag_id: Optional[str] = None,
            key: str = XCOM_RETURN_KEY,
            include_prior_dates: bool = False) -> Any

Upvotes: 3

Related Questions