Christopher Beck
Christopher Beck

Reputation: 765

Export environment variables at runtime with airflow

I am currently converting workflows that were implemented in bash scripts before to Airflow DAGs. In the bash scripts, I was just exporting the variables at run time with

export HADOOP_CONF_DIR="/etc/hadoop/conf"

Now I'd like to do the same in Airflow, but haven't found a solution for this yet. The one workaround I found was setting the variables with os.environ[VAR_NAME]='some_text' outside of any method or operator, but that means they get exported the moment the script gets loaded, not at run time.

Now when I try to call os.environ[VAR_NAME] = 'some_text' in a function that gets called by a PythonOperator, it does not work. My code looks like this

def set_env():
    os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
    os.environ['PATH'] = "somePath:" + os.environ['PATH']
    os.environ['SPARK_HOME'] = "pathToSparkHome"
    os.environ['PYTHONPATH'] = "somePythonPath"
    os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
    os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()

set_env_operator = PythonOperator(
    task_id='set_env_vars_NOT_WORKING',
    python_callable=set_env,
    dag=dag)

Now when my SparkSubmitOperator gets executed, I get the exception:

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

My use case where this is relevant is that I have SparkSubmitOperator, where I submit jobs to YARN, therefore either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment. Setting them in my .bashrc or any other config is sadly not possible for me, which is why I need to set them at runtime.

Preferably I'd like to set them in an Operator before executing the SparkSubmitOperator, but if there was the possibility to pass them as arguments to the SparkSubmitOperator, that would be at least something.

Upvotes: 7

Views: 7596

Answers (1)

Simon D
Simon D

Reputation: 6229

From what I can see in the spark submit operator you can pass in environment variables to spark-submit as a dictionary.

:param env_vars: Environment variables for spark-submit. It
                 supports yarn and k8s mode too.
:type env_vars: dict

Have you tried this?

Upvotes: 2

Related Questions