user7983290
user7983290

Reputation:

Dag run not found when unit testing a custom operator in Airflow

I've written a custom operator (DataCleaningOperator), which corrects JSON data based on a provided schema.

The unit tests previously worked when I didn't have to instatiate a TaskInstance and provide the operator with a context. However, I've updated the operator recently to take in a context (so that it can use xcom_push).

Here is an example of one of the tests:

DEFAULT_DATE = datetime.today()

class TestDataCleaningOperator(unittest.TestCase):    
    """
    Class to execute unit tests for the operator 'DataCleaningOperator'.
    """
    def setUp(self) -> None:
        super().setUp()
        self.dag = DAG(
            dag_id="test_dag_data_cleaning",
            schedule_interval=None,
            default_args={
                "owner": "airflow",
                "start_date": DEFAULT_DATE,
                "output_to_xcom": True,
            },
        )
        self._initialise_test_data()

    def _initialize_test_data() -> None:
        # Test data set here as class variables such as self.test_data_correct
        ...

    def test_operator_cleans_dataset_which_matches_schema(self) -> None:
        """
        Test: Attempt to clean a dataset which matches the provided schema.
        Verification: Returns the original dataset, unchanged.
        """
        task = DataCleaningOperator(
            task_id="test_operator_cleans_dataset_which_matches_schema",
            schema_fields=self.test_schema_nest,
            data_file_object=deepcopy(self.test_data_correct),
            dag=self.dag,
        )
        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
        result: List[dict] = task.execute(ti.get_template_context())
        self.assertEqual(result, self.test_data_correct)

However, when the tests are run, the following error is raised:

airflow.exceptions.DagRunNotFound: DagRun for 'test_dag_data_cleaning' with date 2022-02-22 12:09:51.538954+00:00 not found

This is related to the line in which a task instance is instantiated in test_operator_cleans_dataset_which_matches_schema.

Why can't Airflow locate the test_dag_data_cleaning DAG? Is there a specific configuration I've missed? Do I need to also create a DAG run instance or add the DAG to the dag bag manually if this test dag is outide of my standard DAG directory? All normal (non-test) dags in my dag dir run correctly.

In case it helps, my current Airflow version is 2.2.3 and the structure of my project is:

airflow
├─ dags
├─ plugins
|  ├─ ...
|  └─ operators
|     ├─ ...
|     └─ data_cleaning_operator.py
|
└─ tests
   ├─ ...
   └─ operators
      └─ test_data_cleaning_operator.py

Upvotes: 7

Views: 3419

Answers (1)

anzuman farhana
anzuman farhana

Reputation: 226

The code have written is using Airflow 2.0 format of unit test. So when you upgraded to Airflow 2.2.3, the unit test requires you to create a dagrun before you create a test run.

Below is the sample code which worked for me:

import unittest

import pendulum
from airflow import DAG
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType

from operators.test_operator import EvenNumberCheckOperator

DEFAULT_DATE = pendulum.datetime(2022, 3, 4, tz='America/Toronto')
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"


class TestEvenNumberCheckOperator(unittest.TestCase):

    def setUp(self):
        super().setUp()
        self.dag = DAG('test_dag4', default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE})
        self.even = 10
        self.odd = 11
        EvenNumberCheckOperator(
            task_id=TEST_TASK_ID,
            my_operator_param=self.even,
            dag=self.dag
        )


    def test_even(self):
        """Tests that the EvenNumberCheckOperator returns True for 10."""
        dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING,
                                        execution_date=DEFAULT_DATE,
                                        #data_interval=DEFAULT_DATE,
                                        start_date=DEFAULT_DATE,
                                        run_type=DagRunType.MANUAL)
        ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
        ti.task = self.dag.get_task(task_id=TEST_TASK_ID)
        result = ti.task.execute(ti.get_template_context())
        assert result is True

Upvotes: 7

Related Questions