Nickson Ndangalasi
Nickson Ndangalasi

Reputation: 89

DagBag does not populate dags as expected

I want to test my dags to make sure they have certain default arguments and also to make sure that all dags are not having importation errors.

I am using DagBag to populate dags and then iterate through each dag and check for the values of each dag to make sure they are what I want them to be.

Because DagBag can fetch also the example dags that are shipped with airflow, I am passing the argument include_example = False however when I do this I realize that none of my dags is pulled into dagbags.

Am I using DagBag wrongly? or is there another better way to pull and inspect dags when testing?

My code

def test_no_import_errors():
    dag_bag = DagBag(include_examples=False)
    assert len(dag_bag.import_errors) == 0, "No Import Failures"

Upvotes: 0

Views: 4822

Answers (3)

kehsihba19
kehsihba19

Reputation: 83

By default airflow DagBag looks for dags inside AIRFLOW_HOME/dags folder.

This is usually stored inside airflow.cfg file. By default it points to ~/airflow folder, but you can point to current working directory by running -

export $AIRFLOW_HOME=abs_path_of_your_folder

If you are using python for Airflow installation, make sure to export the $AIRFLOW_HOME variable first, then activate virtual environment and finally install airflow. This will make sure your path is properly attached to the airflow.cfg file.

Also you can check if your folder loaded properly or not, while running the unittest. In terminal, the file path is printed like

[2022-02-03 20:45:57,657] {dagbag.py:500} INFO - Filling up the DagBag from /Users/kehsihba19/Desktop/airflow-test/dags

An example file for checking import errors in DAGs which include checking typos and cyclic tasks check -

from airflow.models import DagBag
import unittest

class TestDags(unittest.TestCase):
    def test_DagBag(self):
        self.dag_bag = DagBag(include_examples=False)
        self.assertFalse(bool(self.dag_bag.import_errors))

if __name__ == "__main__":
    unittest.main()

Upvotes: 0

NicoE
NicoE

Reputation: 4873

I was able to reproduce the problem, when creating the DagBag object, if you don't provide a value to dag_folder parameter, no DAG is added to the colleciton.

So as Jarek stated, this works:

def test_no_import_errors():
    dag_bag = DagBag(dag_folder="dags/", include_examples=False)
    assert len(dag_bag.import_errors) == 0, "No Import Failures"

This is the example I made to test it:

# python -m unittest test_dag_validation.py 
import unittest
import logging
from airflow.models import DagBag


class TestDAGValidation(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        log = logging.getLogger()
        handler = logging.FileHandler("dag_validation.log", mode="w")

        handler.setLevel(logging.INFO)
        log.addHandler(handler)
        cls.log = log

    def test_no_import_errors(self):
        dag_bag = DagBag(dag_folder="dags/", include_examples=False)
        self.log.info(f"How Many DAGs?: {dag_bag.size()}")
        self.log.info(f"Import errors: {len(dag_bag.import_errors)}")
        assert len(dag_bag.import_errors) == 0, "No Import Failures"


Upvotes: 3

Jarek Potiuk
Jarek Potiuk

Reputation: 20097

When you construct DagBag objects you can pass folder list where DagBag should look for the dag files. I guess this is the problem

Upvotes: 3

Related Questions