juwa92
juwa92

Reputation: 53

Importing custom plugins in Airflow 2 [Cloud Composer]

I have a directory structure as such:

airflow_dags
├── dags
│   └── hk  
│       └── hk_dag.py  
├── plugins
│   └── cse   
│       └── operators.py   
│           └── cse_to_bq.py   
└── test
   └── dags   
       └── dag_test.py  

In the GCS bucket created by Cloud Composer, there's a plugin folder where I upload the cse folder.

Now in my hk_dag.py file if I import the plugin like this:

from plugins.cse.operators.cse_to_bq import CSEToBQOperator

and run my unit test, it passes, but in cloud composer I get a ModuleNotFoundError: No module named 'plugins' error message.

If I import the plugin like this in my hk_dag.py:

from cse.operators.cse_to_bq import CSEToBQOperator

My unit test fails with ModuleNotFoundError: No module named 'cse' but it works fine in Cloud Composer.

How do I resolve it?

Upvotes: 5

Views: 1892

Answers (1)

Alvaro
Alvaro

Reputation: 963

In Airflow 2.0 to import your plugin you just need to do it directly from the operators module.

In your case, has to be something like:

from operators.cse_to_bq import CSEToBQOperator

But before that you have to change your folder structure to:

airflow_dags
├── dags
│   └── hk  
│       └── hk_dag.py  
├── plugins
│   └── operators   
│       └── cse   
│           └── cse_to_bq.py 
└── test
   └── dags   
       └── dag_test.py 

Upvotes: 6

Related Questions