Adam
Adam

Reputation: 1285

How do we set OS environment variables in Airflow

I have an airflow DAG and what i am trying to do is read my variables stored in the airflow UI (username and password) and pass those variable values as exported values in the OS. The reason for this, is because I am using dbt yaml file which requires me to read the environment variable dbt_user. (the only other way is to set the password in the yaml file which is not secure.

default:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: xxxx
      user: "{{ env_var('dbt_user') }}"

I tried to write a dag which does the bashoperator export but it does not seem to set the environment variable.

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,8,1),
    'retries': 0
}


with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    task_1 = BashOperator(
        task_id='get_variables',
        bash_command='export dbt_user={{ var.value.dbt_user }} ',
        env = os.environ.copy(),
        dag=dag
    )

    task_2 = BashOperator(
        task_id='load_seed_data_once',
        bash_command='echo $dbt_user',
        dag=dag
    )

task_1 >> task_2

when I tried to echo we can see nothing is being set. Does anyone know how to set an environment variable using the bashoperator?

[2021-11-04 12:00:34,452] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo $dbt_user']
[2021-11-04 12:00:34,463] {subprocess.py:74} INFO - Output:
[2021-11-04 12:00:34,464] {subprocess.py:78} INFO - 
[2021-11-04 12:00:34,465] {subprocess.py:82} INFO - Command exited with return code 0
[2021-11-04 12:00:34,494] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=sample, task_id=load_seed_data_once, execution_date=20211104T120032, start_date=20211104T120034, end_date=20211104T120034
[2021-11-04 12:00:34,517] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-11-04 12:00:34,555] {local_task_job.py:149} INFO - Task exited with return code 0

update:

I also tried doing via the python operator but it didnt work as well. It gave me a raise KeyError(key) from None KeyError: 'variable_1'

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,8,1),
    'retries': 0
}

def set_env():
    os.environ["variable_1"] = "value_1"

def print_env_var():
    print(os.environ["variable_1"])


with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    set_env_task = PythonOperator(
        task_id='python_task', 
        python_callable=set_env,
        dag=dag
    )

    print_env_task = PythonOperator(
        task_id='load_seed_data_once',
        python_callable=print_env_var,
        dag=dag
    )

set_env_task >> print_env_task

Upvotes: 4

Views: 19212

Answers (2)

Guilherme Z. Santos
Guilherme Z. Santos

Reputation: 237

Adding to @Yannick's answer. Since every operator usually run in its own environement, you have to set that environment accordingly. In the OP's case, which is to run dbt, that will depend on how dbt is being executed, i.e., a DockerOperator, a KubernetesPodOperador, a BashOperator, or a PythonOperator and so forth.

Most of those operators have an env-like parameter that can be used to export environment variables to the runtime during execution. The following snippet provides and example of how this could be accomplished:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client.models import V1EnvVar
from datetime import datetime
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 0
}

def python_function(MY_VAR):
    os.environ['MY_VAR'] = MY_VAR
    # do something which requires env var set
    ...

with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    k8s_task = KubernetesPodOperator(
        task_id='python_task',
        env_vars=[V1EnvVar(name='MY_VAR', value='value')],
        ...
    )

    docker_task = DockerOperator(
        task_id='docker_task',
        image="alpine",
        ...
        environment={'MY_VAR': 'value'},
    )

    bash_task = BashOperator(
        task_id='bash_task',
        env={'MY_VAR': 'value'},
        bash_command="echo $MY_VAR",
    )

    python_task = PythonOperator(
        task_id='python_task',
        op_kwargs={'MY_VAR': 'value'},
        python_callable=python_function,
    )

For PythonOperator, however, it is not possible to set the environment variables through an operator parameter, therefore, in the above example, a workaround is used to set the environment using op_kwargs and os.

Upvotes: 3

yan-hic
yan-hic

Reputation: 1544

The BashOperator and PythonOperator - any operator I think - start a new child shell and will only inherit the environment set in the container in the image, at runtime (e.g. compose or k8s deploy) or by a script before starting airflow (e.g. entrypoint).

That's why you have the env parameter in BashOperator to pass whatever dict of env vars you want to set for the script. You can pass the dbt_user and password from the AF variables there too as env is templated.


env={'dbt_user': '{{ var.value.dbt_user }}'}

You can set env in the dag defaults too to make it available to all tasks so you don't need to set individually.

Lastly, if you use the LocalExecutor, you can instead do the following in the first bash :

echo "export dbt_user={{ var.value.dbt_user }} >> ~/.bashrc

It will make the exported vars accessible in any new shell. Note this would not work with KubernetesExecutor as a new container is started - but there are ways around it.

Upvotes: 2

Related Questions