Reputation:
I've written a custom operator (DataCleaningOperator), which corrects JSON data based on a provided schema.
The unit tests previously worked when I didn't have to instatiate a TaskInstance and provide the operator with a context. However, I've updated the operator recently to take in a context (so that it can use xcom_push).
Here is an example of one of the tests:
DEFAULT_DATE = datetime.today()
class TestDataCleaningOperator(unittest.TestCase):
"""
Class to execute unit tests for the operator 'DataCleaningOperator'.
"""
def setUp(self) -> None:
super().setUp()
self.dag = DAG(
dag_id="test_dag_data_cleaning",
schedule_interval=None,
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
"output_to_xcom": True,
},
)
self._initialise_test_data()
def _initialize_test_data() -> None:
# Test data set here as class variables such as self.test_data_correct
...
def test_operator_cleans_dataset_which_matches_schema(self) -> None:
"""
Test: Attempt to clean a dataset which matches the provided schema.
Verification: Returns the original dataset, unchanged.
"""
task = DataCleaningOperator(
task_id="test_operator_cleans_dataset_which_matches_schema",
schema_fields=self.test_schema_nest,
data_file_object=deepcopy(self.test_data_correct),
dag=self.dag,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
result: List[dict] = task.execute(ti.get_template_context())
self.assertEqual(result, self.test_data_correct)
However, when the tests are run, the following error is raised:
airflow.exceptions.DagRunNotFound: DagRun for 'test_dag_data_cleaning' with date 2022-02-22 12:09:51.538954+00:00 not found
This is related to the line in which a task instance is instantiated in test_operator_cleans_dataset_which_matches_schema.
Why can't Airflow locate the test_dag_data_cleaning DAG? Is there a specific configuration I've missed? Do I need to also create a DAG run instance or add the DAG to the dag bag manually if this test dag is outide of my standard DAG directory? All normal (non-test) dags in my dag dir run correctly.
In case it helps, my current Airflow version is 2.2.3 and the structure of my project is:
airflow
├─ dags
├─ plugins
| ├─ ...
| └─ operators
| ├─ ...
| └─ data_cleaning_operator.py
|
└─ tests
├─ ...
└─ operators
└─ test_data_cleaning_operator.py
Upvotes: 7
Views: 3419
Reputation: 226
The code have written is using Airflow 2.0 format of unit test. So when you upgraded to Airflow 2.2.3, the unit test requires you to create a dagrun before you create a test run.
Below is the sample code which worked for me:
import unittest
import pendulum
from airflow import DAG
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from operators.test_operator import EvenNumberCheckOperator
DEFAULT_DATE = pendulum.datetime(2022, 3, 4, tz='America/Toronto')
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"
class TestEvenNumberCheckOperator(unittest.TestCase):
def setUp(self):
super().setUp()
self.dag = DAG('test_dag4', default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE})
self.even = 10
self.odd = 11
EvenNumberCheckOperator(
task_id=TEST_TASK_ID,
my_operator_param=self.even,
dag=self.dag
)
def test_even(self):
"""Tests that the EvenNumberCheckOperator returns True for 10."""
dagrun = self.dag.create_dagrun(state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
#data_interval=DEFAULT_DATE,
start_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = self.dag.get_task(task_id=TEST_TASK_ID)
result = ti.task.execute(ti.get_template_context())
assert result is True
Upvotes: 7