Reputation: 765
I am currently converting workflows that were implemented in bash scripts before to Airflow DAGs. In the bash scripts, I was just exporting the variables at run time with
export HADOOP_CONF_DIR="/etc/hadoop/conf"
Now I'd like to do the same in Airflow, but haven't found a solution for this yet. The one workaround I found was setting the variables with os.environ[VAR_NAME]='some_text'
outside of any method or operator, but that means they get exported the moment the script gets loaded, not at run time.
Now when I try to call os.environ[VAR_NAME] = 'some_text'
in a function that gets called by a PythonOperator, it does not work. My code looks like this
def set_env():
os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
os.environ['PATH'] = "somePath:" + os.environ['PATH']
os.environ['SPARK_HOME'] = "pathToSparkHome"
os.environ['PYTHONPATH'] = "somePythonPath"
os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()
set_env_operator = PythonOperator(
task_id='set_env_vars_NOT_WORKING',
python_callable=set_env,
dag=dag)
Now when my SparkSubmitOperator gets executed, I get the exception:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
My use case where this is relevant is that I have SparkSubmitOperator
, where I submit jobs to YARN, therefore either HADOOP_CONF_DIR
or YARN_CONF_DIR
must be set in the environment. Setting them in my .bashrc
or any other config is sadly not possible for me, which is why I need to set them at runtime.
Preferably I'd like to set them in an Operator before executing the SparkSubmitOperator
, but if there was the possibility to pass them as arguments to the SparkSubmitOperator
, that would be at least something.
Upvotes: 7
Views: 7596
Reputation: 6229
From what I can see in the spark submit operator you can pass in environment variables to spark-submit as a dictionary.
:param env_vars: Environment variables for spark-submit. It
supports yarn and k8s mode too.
:type env_vars: dict
Have you tried this?
Upvotes: 2