Reputation: 1285
I have an airflow DAG and what i am trying to do is read my variables stored in the airflow UI (username and password) and pass those variable values as exported values in the OS. The reason for this, is because I am using dbt yaml file which requires me to read the environment variable dbt_user
. (the only other way is to set the password in the yaml file which is not secure.
default:
target: dev
outputs:
dev:
type: snowflake
account: xxxx
user: "{{ env_var('dbt_user') }}"
I tried to write a dag which does the bashoperator export but it does not seem to set the environment variable.
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,8,1),
'retries': 0
}
with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
task_1 = BashOperator(
task_id='get_variables',
bash_command='export dbt_user={{ var.value.dbt_user }} ',
env = os.environ.copy(),
dag=dag
)
task_2 = BashOperator(
task_id='load_seed_data_once',
bash_command='echo $dbt_user',
dag=dag
)
task_1 >> task_2
when I tried to echo we can see nothing is being set. Does anyone know how to set an environment variable using the bashoperator?
[2021-11-04 12:00:34,452] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo $dbt_user']
[2021-11-04 12:00:34,463] {subprocess.py:74} INFO - Output:
[2021-11-04 12:00:34,464] {subprocess.py:78} INFO -
[2021-11-04 12:00:34,465] {subprocess.py:82} INFO - Command exited with return code 0
[2021-11-04 12:00:34,494] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=sample, task_id=load_seed_data_once, execution_date=20211104T120032, start_date=20211104T120034, end_date=20211104T120034
[2021-11-04 12:00:34,517] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-11-04 12:00:34,555] {local_task_job.py:149} INFO - Task exited with return code 0
update:
I also tried doing via the python operator but it didnt work as well. It gave me a raise KeyError(key) from None KeyError: 'variable_1'
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,8,1),
'retries': 0
}
def set_env():
os.environ["variable_1"] = "value_1"
def print_env_var():
print(os.environ["variable_1"])
with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
set_env_task = PythonOperator(
task_id='python_task',
python_callable=set_env,
dag=dag
)
print_env_task = PythonOperator(
task_id='load_seed_data_once',
python_callable=print_env_var,
dag=dag
)
set_env_task >> print_env_task
Upvotes: 4
Views: 19212
Reputation: 237
Adding to @Yannick's answer. Since every operator usually run in its own environement, you have to set that environment accordingly. In the OP's case, which is to run dbt
, that will depend on how dbt
is being executed, i.e., a DockerOperator
, a KubernetesPodOperador
, a BashOperator
, or a PythonOperator
and so forth.
Most of those operators have an env
-like parameter that can be used to export environment variables to the runtime during execution. The following snippet provides and example of how this could be accomplished:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client.models import V1EnvVar
from datetime import datetime
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 0
}
def python_function(MY_VAR):
os.environ['MY_VAR'] = MY_VAR
# do something which requires env var set
...
with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
k8s_task = KubernetesPodOperator(
task_id='python_task',
env_vars=[V1EnvVar(name='MY_VAR', value='value')],
...
)
docker_task = DockerOperator(
task_id='docker_task',
image="alpine",
...
environment={'MY_VAR': 'value'},
)
bash_task = BashOperator(
task_id='bash_task',
env={'MY_VAR': 'value'},
bash_command="echo $MY_VAR",
)
python_task = PythonOperator(
task_id='python_task',
op_kwargs={'MY_VAR': 'value'},
python_callable=python_function,
)
For PythonOperator
, however, it is not possible to set the environment variables through an operator parameter, therefore, in the above example, a workaround is used to set the environment using op_kwargs
and os
.
Upvotes: 3
Reputation: 1544
The BashOperator
and PythonOperator
- any operator I think - start a new child shell and will only inherit the environment set in the container in the image, at runtime (e.g. compose
or k8s deploy
) or by a script before starting airflow
(e.g. entrypoint).
That's why you have the env
parameter in BashOperator
to pass whatever dict of env vars you want to set for the script.
You can pass the dbt_user
and password from the AF variables there too as env
is templated.
env={'dbt_user': '{{ var.value.dbt_user }}'}
You can set env
in the dag defaults too to make it available to all tasks so you don't need to set individually.
Lastly, if you use the LocalExecutor
, you can instead do the following in the first bash :
echo "export dbt_user={{ var.value.dbt_user }} >> ~/.bashrc
It will make the exported vars accessible in any new shell.
Note this would not work with KubernetesExecutor
as a new container is started - but there are ways around it.
Upvotes: 2