BHC
BHC

Reputation: 77

How to use variables declared in Python Operator in other operators?

I have a requirement to compute a value in python operator and use it in other operators as shown below .But I'm getting "dag_var does not exist" for spark submit and email operators/

I'm declaring dag_var as a global variable in the python callable. But I'm not able to access this in other operators.

def get_dag_var(ds, **kwargs):
    global dag_var
    dag_var = kwargs['dag_run'].run_id


with DAG(
    dag_id='sample',
    schedule_interval=None, # executes at 6 AM UTC every day
    start_date=datetime(2021, 1, 1),
    default_args=default_args,
    catchup=False
) as dag:

    get_dag_var = PythonOperator(
        task_id='get_dag_id',
        provide_context=True,
        python_callable=get_dag_var)

   spark_submit = SparkSubmitOperator(application="abc".....
                                      ..
                                      application_args = [dag_var])
                                
            

    failure_notification = EmailOperator(
        task_id = "failure_notification ",
        to='[email protected]',
        subject='Workflow Failes',
        trigger_rule="one_failed",
        html_content= f""" <h3>Failure Mail - {dag_var}</h3> """
    )

    get_dag_var >> spark_submit >> failure_notification 

Any help is appreciated. Thank you.

Upvotes: 0

Views: 1703

Answers (1)

Bas Harenslak
Bas Harenslak

Reputation: 3094

You can share data between operators using XComs. In your get_dag_var function, any returned value is automatically stored as an XCom record in Airflow. You can inspect the values under Admin -> XComs.

To use an XCom value in a following task, you can apply templating:

spark_submit = SparkSubmitOperator(
    application="ABC",
    ...,
    application_args = ["{{ ti.xcom_pull(task_ids='get_dag_id') }}"],
)

The {{ }} define a templated string that is evaluated at runtime. ti.xcom_pull will "pull" the XCom value from the get_dag_id task at runtime.

One thing to note using templating: not all operator's arguments are template-able. Non-template-able arguments do not evaluate {{ }} at runtime. SparkSubmitOperator.application_args and EmailOperator.html_content are template-able, meaning a templated string is evaluated at runtime and you'll be able to provide an XCom value. Inspect the template_fields property for your operator to know which fields are template-able and which are not.

And one thing to note using XComs: be aware the XCom value is stored in the Airflow metastore, so be careful not to return huge variables which might not fit in a database record. To store XCom values in a different system than the Airflow metastore, check out custom XCom backends.

Upvotes: 2

Related Questions