Hans Bambel
Hans Bambel

Reputation: 956

AirFlow: Setting environment variable and environment in AirFlow task

I am currently trying to automate a data pipeline which consists of different scripts. Many of these scripts rely on the setting of an environment variable called DB_URL. In the python scripts this variable is read in via os.getenv('DB_URL', None).

It is possible for me to execute a DAG by using a BashOperator and specifying DB_URL directly in front of the execution of the script:

default_args = {
'owner': 'Hans Bambel',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='get_data_from_testdb',
    default_args=default_args,
    description='Gets some data from the DB specified in the connection "test_db"',
    schedule_interval=None,
    start_date=datetime(2021, 2, 24)
) as dag:
    connection = BaseHook.get_connection("test_db")
    db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + \
             '@' + str(connection.host) + ':' + str(connection.port) + '/' \
             + str(connection.schema)

    test_db_call = BashOperator(
        task_id='test_db_call',
        bash_command=f'export DB_URL={db_url}; /path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
    )

But I would like to set up DB_URL once for all scripts that would use the same env variable like this:

with DAG(
        dag_id='get_data_from_testdb',
        default_args=default_args,
        description='Gets some data from the DB specified in the connection "test_db"',
        schedule_interval=None,
        start_date=datetime(2021, 2, 24)
    ) as dag:
        connection = BaseHook.get_connection("test_db")
        db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + '@' + str(connection.host) + ':' + str(connection.port) + '/' + str(connection.schema)

        set_db_env = BashOperator(
            task_id='set-dburl',
            bash_command=f'export DB_URL={db_url}'
        )

        # activate_myenv = BashOperator(
        #     task_id='activate-conda-environment',
        #     bash_command='source activate myenv'
        # )

        test_db_call = BashOperator(
            task_id='test_db_call',
            bash_command=f'/path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
        )
        set_db-env >> test_db_call

Additionally, I would like to activate my conda environment beforehand (as prepared by the activate_myenv-task), but I get the following error when adding it to the DAG:

[2021-02-25 17:07:12,923] {bash.py:158} INFO - Running command: source activate carex
[2021-02-25 17:07:12,932] {bash.py:169} INFO - Output:
[2021-02-25 17:07:12,942] {bash.py:173} INFO - bash: activate: No such file or directory
[2021-02-25 17:07:12,943] {bash.py:177} INFO - Command exited with return code 1

I expected that each DAG is run in isolation, but not each task as well. Therefore, I expected my second DAG to work as well. Is there something I can do to make it work?

Upvotes: 1

Views: 7311

Answers (1)

joao
joao

Reputation: 2293

When you make any change to a unix shell's environment, such as creating a new variable, that change is propagated down to child processes, but never up to the shell's parent. That's how Unix shells work.

So when your BashOperator does an "export DB_URL=...", that changes the shell that is running the set_db_env task, but as soon as that task finishes, its modified environment is gone, and with it the DB_URL variable you created. The next task test_db_call inherits the same environment that set_db_env started out with, not the one it changed.

To make DB_URL available for all scripts, you can define it before the airflow processes are run, typically in the .bashrc file of the user running the airflow process. This way your shell script can directly access the variable's value in the usual way.

Or you might use Airflow's "variables": in the Airflow UI, menu Admin / Variables, define key=DB_URL, set the value, and save it. Then you can use the mechanism described in the Concepts docs with the jinja templates:

bash_command='echo {{ var.value.DB_URL }}'

to access the variable's value using the double brackets syntax.

Upvotes: 1

Related Questions