Reputation: 25
lately I have been playing around with Airflow and PySpark. I saw that Airflow has a number of variables. My aim is to parse one of those variables and import it to my pySpark script. So far I tried to echo the value of the variable (worked) but then, I couldn't find a way to import in to pySpark(I want to pass the value of that variable to another variable in my pyspark script). I also attach my code( job_id
is the variable I am talking about).
test_bash = """
export un_id={{ti.job_id}}
echo $un_id
"""
bash_task = BashOperator(
task_id='test',
bash_command=test_bash,
xcom_push=True,
provide_context=True,
dag=dag)
def pull_function(**kwargs):
ti = kwargs['ti']
rt = ti.xcom_pull(task_ids='test')
print(rt)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag
)
#############
bash_task >> pull_task
Any idea how should I carry on or if I am doing something wrong?
Upvotes: 0
Views: 1728
Reputation: 319
I haven't tried what @kaxil suggested but if I understood correct your question you want ton retrieve the run_id
variable from Airflow and use it in your python (pySpark) script. If that's the case, I assume that you use a BashOperator
to spark-submit
your job. When submitting a spark-job you are allowed to submit (along with your job) some arguments. Those arguments appear as system arguments which you can see if you do a print(sys.argv)
(useful to see in which position is your variable).
Since you already pushed the variable with the bash_task
you will have to pull it. So when you submit your spark job you should also add an extra argument like this :
cmd=""spark-submit your-pyspark-file.py {{ ti.xcom_pull("test") }}
retrieval = BashOperator(
namespace='randomname',
arguments=[cmd],
name='example-dag1',
task_id='name-you-desire',
provide_context=True,
get_logs=True,
dag=dag)
Then, if you did execute the print(sys.argv)
you will be able to see your variable as an argument and in your script you can refer to that variable by sys.argv[1]
(if it is in the second position, 0 if it is in the first etc).
Upvotes: 0
Reputation: 18824
This value is actually called run_id
and can be accessed via the context or macros.
In the Pythonoperator
this is accessed via context, and in the BashOperator
this is accessed via jinja templating on the bash_command
field.
More info on what's available in macros:
https://airflow.incubator.apache.org/code.html#macros
More info on jinja:
https://airflow.incubator.apache.org/concepts.html#jinja-templating
from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='run_id',
schedule_interval=None,
start_date=datetime(2017, 2, 26)
)
def my_func(**kwargs):
context = kwargs
print(context['dag_run'].run_id)
t1 = PythonOperator(
task_id='python_run_id',
python_callable=my_func,
provide_context=True,
dag=dag
)
t2 = BashOperator(
task_id='bash_run_id',
bash_command='echo {{run_id}}',
dag=dag)
t1.set_downstream(t2)
Use this dag as an example, and check the log for each operator, you should see the run_id
printed in the log.
Upvotes: 1