Reputation: 956
I am currently trying to automate a data pipeline which consists of different scripts. Many of these scripts rely on the setting of an environment variable called DB_URL
.
In the python scripts this variable is read in via os.getenv('DB_URL', None)
.
It is possible for me to execute a DAG by using a BashOperator and specifying DB_URL
directly in front of the execution of the script:
default_args = {
'owner': 'Hans Bambel',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='get_data_from_testdb',
default_args=default_args,
description='Gets some data from the DB specified in the connection "test_db"',
schedule_interval=None,
start_date=datetime(2021, 2, 24)
) as dag:
connection = BaseHook.get_connection("test_db")
db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + \
'@' + str(connection.host) + ':' + str(connection.port) + '/' \
+ str(connection.schema)
test_db_call = BashOperator(
task_id='test_db_call',
bash_command=f'export DB_URL={db_url}; /path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
)
But I would like to set up DB_URL
once for all scripts that would use the same env variable like this:
with DAG(
dag_id='get_data_from_testdb',
default_args=default_args,
description='Gets some data from the DB specified in the connection "test_db"',
schedule_interval=None,
start_date=datetime(2021, 2, 24)
) as dag:
connection = BaseHook.get_connection("test_db")
db_url = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + '@' + str(connection.host) + ':' + str(connection.port) + '/' + str(connection.schema)
set_db_env = BashOperator(
task_id='set-dburl',
bash_command=f'export DB_URL={db_url}'
)
# activate_myenv = BashOperator(
# task_id='activate-conda-environment',
# bash_command='source activate myenv'
# )
test_db_call = BashOperator(
task_id='test_db_call',
bash_command=f'/path/to/my/conda/environment/python /path/to/my/scripts/db_connection_test.py'
)
set_db-env >> test_db_call
Additionally, I would like to activate my conda environment beforehand (as prepared by the activate_myenv
-task), but I get the following error when adding it to the DAG:
[2021-02-25 17:07:12,923] {bash.py:158} INFO - Running command: source activate carex
[2021-02-25 17:07:12,932] {bash.py:169} INFO - Output:
[2021-02-25 17:07:12,942] {bash.py:173} INFO - bash: activate: No such file or directory
[2021-02-25 17:07:12,943] {bash.py:177} INFO - Command exited with return code 1
I expected that each DAG is run in isolation, but not each task as well. Therefore, I expected my second DAG to work as well. Is there something I can do to make it work?
Upvotes: 1
Views: 7311
Reputation: 2293
When you make any change to a unix shell's environment, such as creating a new variable, that change is propagated down to child processes, but never up to the shell's parent. That's how Unix shells work.
So when your BashOperator does an "export DB_URL=...", that changes the shell that is running the set_db_env task, but as soon as that task finishes, its modified environment is gone, and with it the DB_URL variable you created. The next task test_db_call inherits the same environment that set_db_env started out with, not the one it changed.
To make DB_URL available for all scripts, you can define it before the
airflow processes are run, typically in the .bashrc
file of the user running
the airflow process. This way your shell script can directly access the
variable's value in the usual way.
Or you might use Airflow's "variables": in the Airflow UI, menu Admin / Variables
, define key=DB_URL, set the value, and save it. Then you can use the
mechanism described in the Concepts
docs with
the jinja templates:
bash_command='echo {{ var.value.DB_URL }}'
to access the variable's value using the double brackets syntax.
Upvotes: 1