Reputation: 81
I'm trying to import a script in many DAGs to call the same operation multiple times. What is the best way to apply this kind of solution?
Right now I have a folder structure as:
dags/
|-- some_dags_folder/
|---- some_dag.py
|-- other_dags_folder/
|---- another_dag.py
|-- utils/
|---- util_slack.py
When I try to import the util_slack
file I place into the DAG code the following, for the example suppose the code is from some_dag.py
:
from ..utils.util_slack import some_function
After place everything inside Airflow I get the following error:
Broken DAG: [/usr/local/airflow/dags/some_dags_folder/some_dag.py] attempted relative import with no known parent package
The util_slack
script is a file made to send either a success message or a fail message and it looks like this
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.hooks.base_hook import BaseHook
CHANNEL = BaseHook.get_connection('Slack').login
TOKEN = BaseHook.get_connection('Slack').password
def slack_success(context):
...
alterHook = SlackWebhookOperator(...)
return alterHook.execut(context=context)
def slack_fail(context):
...
alterHook = SlackWebhookOperator(...)
return alterHook.execut(context=context)
The idea is that I can import the util_slack
module or any other self-made module into multiple DAGs and invoke the function I need as
...
from ..utils.util_slack import slack_success
...
def task_success(context):
return slack_success(context)
...
some_task_in_dag = SSHOperator(
...
on_success_callback=task_success
...)
Is this the best approach or is it better to create custom plugins like the ones showed at https://airflow.apache.org/plugins.html?
Upvotes: 5
Views: 2994
Reputation: 182
The plugins folder is by default lazy loaded. (Loaded only once or when the instance is restarted) You can add a util folder there for all your common scripts. As the dags folder is continuously reloaded to the instance it would be better to use your unchanging common code in plugins. (So as not to increase the burden on the scheduler)
Upvotes: 0
Reputation: 11302
Not sure if plugins
are a good approach in your case. Plugins
integrate external features to Airflow
core(such as custom endpoints, custom login/auth etc).
Below is my approach. At the moment I have a lot of tasks which work with ClickHouse
. So I need to get connection/truncate/insert/copy/etc in different DAG's
. Structure example:
dags
├── lib # you can choose any your favorite name(utils, tools etc)
│ ├── ... just another common package / module
│ ├── default.py
│ ├── configurator.py
│ └── telegram.py
└── # dag1, dag2...dag_n
default.py - just default DAG params
from lib.telegram import send_message
def on_success_callback(context):
pass
def on_failure_callback(context):
config = get_main_config()
if not config.get('NOTIFY_ON_FAILURE'):
return
send_message('failed blabla')
def get_main_config():
# I use variable with key 'MAIN_CONFIG' to store some common settings for all dags
return Variable.get('MAIN_CONFIG', deserialize_json=True)
def get_default_args():
return {
'email_on_failure': False,
'email_on_retry': False,
'on_failure_callback': on_failure_callback,
'on_success_callback': on_success_callback,
# etc...
}
configurator.py - all necessary initialization in one place. I use inject but you can use any tool / approach This is just an example.
from lib.default import get_main_config
from airflow.hooks.base_hook import BaseHook
class InstancesPool:
def __init__(self, slack_connection, db_connection):
self._db_connection = db_connection
self._slack_connection = slack_connection
def get_slack_connection(self):
return self._slack_connection
def get_db_connection():
return self._db_connection
class DbConnection:
# just an example
def __init__(self, user, password):
pass
def configure():
config = get_main_config()
return InstancesPool(
BaseHook.get_connection('Slack'),
DbConnection(config['DB_USER'], config['DB_PASSWORD'])
)
This way you will not have problems with imports or initialization. You just call:
from lib.configurator import configure
def my_task(ds, **kwargs):
pool = configure()
# pool.get_slack_connection() etc...
Hope this helps.
Upvotes: 6