Esteban Sierra
Esteban Sierra

Reputation: 81

How to use common script in diferent DAGs?

I'm trying to import a script in many DAGs to call the same operation multiple times. What is the best way to apply this kind of solution?

Right now I have a folder structure as:

dags/
|-- some_dags_folder/
|---- some_dag.py
|-- other_dags_folder/
|---- another_dag.py
|-- utils/
|---- util_slack.py

When I try to import the util_slack file I place into the DAG code the following, for the example suppose the code is from some_dag.py:

from ..utils.util_slack import some_function

After place everything inside Airflow I get the following error:

Broken DAG: [/usr/local/airflow/dags/some_dags_folder/some_dag.py] attempted relative import with no known parent package

The util_slack script is a file made to send either a success message or a fail message and it looks like this

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.hooks.base_hook import BaseHook

CHANNEL = BaseHook.get_connection('Slack').login
TOKEN = BaseHook.get_connection('Slack').password

def slack_success(context):
    ...
    alterHook = SlackWebhookOperator(...)
    return alterHook.execut(context=context)

def slack_fail(context):
    ...
    alterHook = SlackWebhookOperator(...)
    return alterHook.execut(context=context)

The idea is that I can import the util_slack module or any other self-made module into multiple DAGs and invoke the function I need as

...
from ..utils.util_slack import slack_success

...

def task_success(context):
    return slack_success(context)
...
some_task_in_dag = SSHOperator(
    ...
    on_success_callback=task_success
    ...)

Is this the best approach or is it better to create custom plugins like the ones showed at https://airflow.apache.org/plugins.html?

Upvotes: 5

Views: 2994

Answers (2)

Priyadarshan Mohanty
Priyadarshan Mohanty

Reputation: 182

The plugins folder is by default lazy loaded. (Loaded only once or when the instance is restarted) You can add a util folder there for all your common scripts. As the dags folder is continuously reloaded to the instance it would be better to use your unchanging common code in plugins. (So as not to increase the burden on the scheduler)

Upvotes: 0

Danila Ganchar
Danila Ganchar

Reputation: 11302

Not sure if plugins are a good approach in your case. Plugins integrate external features to Airflow core(such as custom endpoints, custom login/auth etc).

Below is my approach. At the moment I have a lot of tasks which work with ClickHouse. So I need to get connection/truncate/insert/copy/etc in different DAG's. Structure example:

 dags
    ├── lib  # you can choose any your favorite name(utils, tools etc)
    │   ├── ... just another common package / module
    │   ├── default.py
    │   ├── configurator.py
    │   └── telegram.py
    └── # dag1, dag2...dag_n

default.py - just default DAG params

from lib.telegram import send_message

def on_success_callback(context):
    pass


def on_failure_callback(context):
    config = get_main_config()
    if not config.get('NOTIFY_ON_FAILURE'):
        return
    send_message('failed blabla')


def get_main_config():
    # I use variable with key 'MAIN_CONFIG' to store some common settings for all dags
    return Variable.get('MAIN_CONFIG', deserialize_json=True)


def get_default_args():
    return {
        'email_on_failure': False,
        'email_on_retry': False,
        'on_failure_callback': on_failure_callback,
        'on_success_callback': on_success_callback,
        # etc...
    }

configurator.py - all necessary initialization in one place. I use inject but you can use any tool / approach This is just an example.

from lib.default import get_main_config
from airflow.hooks.base_hook import BaseHook


class InstancesPool:
    def __init__(self, slack_connection, db_connection):
        self._db_connection = db_connection
        self._slack_connection = slack_connection

    def get_slack_connection(self):
        return self._slack_connection

    def get_db_connection():
        return self._db_connection


class DbConnection:
    # just an example
    def __init__(self, user, password):
        pass


def configure():
    config = get_main_config()

    return InstancesPool(
        BaseHook.get_connection('Slack'),
        DbConnection(config['DB_USER'], config['DB_PASSWORD'])
    )

This way you will not have problems with imports or initialization. You just call:

from lib.configurator import configure


def my_task(ds, **kwargs):
    pool = configure()
    # pool.get_slack_connection() etc...

Hope this helps.

Upvotes: 6

Related Questions