Bazilio
Bazilio

Reputation: 15

Passing Airflow DAG Configuration Values to GreatExpectationsOperator Tasks

I have Airflow DAGs set up to run Great Expectations' checkpoints.yml alongside corresponding expectations.json files. These DAGs work well for full Data Quality tests.

Now, I'm in need of a DAG that can be triggered with configurations, such as {"load_start_date": "2023-07-01"}, for incremental DQ tests. When running full tests, I intend to omit configurations.

My approach involves using an environment variable that dynamically changes its value based on the passed configurations. I attempted to use Jinja templating syntax {{ dag_run.conf.get('key', 'default_value') }} when declaring this environment variable. However, since Jinja templating operates in a read-only manner within DAG context, the entire {{ ... }} expression is passed to the checkpoints. Here are the relevant parts of my DAG code:

# Necessary imports

# Setting environment variables

os.environ["sf_user"] = snowflake_credentials["snowflake_user"] 
os.environ["sf_pass"] = snowflake_credentials["snowflake_password"]
os.environ["sf_host"] = Variable.get("snowflake_host")
os.environ["env_name"] = Variable.get("environment")

# Setting the environment variable "load_start_date"

os.environ["load_start_date"] = "{{ dag_run.conf.get('load_start_date', '1900-01-01') }}"

# DAG definition with GreatExpectationsOperator tasks

with DAG(
        dag_id="ge_dag_name",
        start_date=datetime(2021, 12, 15),
        catchup=False,
        schedule_interval=None
) as dag:
    task_GE_c_hub_tbl_name = GreatExpectationsOperator(
        task_id="task_GE_c_hub_tbl_name",
        data_context_root_dir=ge_root_dir,
        checkpoint_name="sf.edw.c_hub_tbl_name",
        trigger_rule="all_done",
        fail_task_on_validation_failure=True, 
        return_json_dict=True,
    )
    # Additional tasks...

Here is a fragment of my Great Expectations checkpoint:

    ....
    runtime_parameters:
        query: "
        SELECT * FROM tbl_name
        WHERE load_datetime >= $load_start_date :: DATE 
        "
    ...

While the value of os.environ["load_start_date"] successfully reaches the checkpoint's SQL, the Jinja templating syntax is not being interpreted as expected. Instead of obtaining the desired date values like '2023-07-01' or '1900-01-01', I'm encountering the raw Jinja template expression in my SQL query:

WHERE load_datetime >= ‘{{ dag_run.conf.get('load_start_date', '1900-01-01') }}’ :: DATE

I'm looking for an alternative solution to pass the DAG configuration value to GreatExpectationsOperator tasks. Any guidance would be appreciated.

Upvotes: 1

Views: 359

Answers (1)

ozs
ozs

Reputation: 3681

The problem is that you os.environ["load_start_date"] when the dag is parsed by airflow and not when the dag run.

if you want to update the load_start_date during runtime you can add a PythonOperator that do this and run before GreatExpectationsOperator .

@task()
def set_load_start_date(dag_run=None):
    os.environ["load_start_date"] = dag_run.conf.get('load_start_date', '1900-01-01')

set_load_start_run() >> task_GE_c_hub_tbl_name

** I think its not a good practice to use env because it effect all dag_runs that read from this env variable and its better to use checkpoint_kwargs

task_GE_c_hub_tbl_name = GreatExpectationsOperator(
    task_id="task_GE_c_hub_tbl_name",
    data_context_root_dir=ge_root_dir,
    checkpoint_name="sf.edw.c_hub_tbl_name",
    trigger_rule="all_done",
    fail_task_on_validation_failure=True, 
    return_json_dict=True,
    checkpoint_kwargs={"load_start_date" : "{{ dag_run.conf.get('load_start_date', '1900-01-01') }}" }
)

Upvotes: 0

Related Questions