Michael Germ
Michael Germ

Reputation: 25

Import Airflow variable to PySpark

lately I have been playing around with Airflow and PySpark. I saw that Airflow has a number of variables. My aim is to parse one of those variables and import it to my pySpark script. So far I tried to echo the value of the variable (worked) but then, I couldn't find a way to import in to pySpark(I want to pass the value of that variable to another variable in my pyspark script). I also attach my code( job_id is the variable I am talking about).

test_bash = """
export un_id={{ti.job_id}}
echo $un_id
"""

bash_task = BashOperator(
    task_id='test',
    bash_command=test_bash,
    xcom_push=True,
    provide_context=True,
    dag=dag)

def pull_function(**kwargs):
    ti = kwargs['ti']
    rt = ti.xcom_pull(task_ids='test')
    print(rt)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag
)

#############
bash_task >> pull_task

Any idea how should I carry on or if I am doing something wrong?

Upvotes: 0

Views: 1728

Answers (2)

Gerasimos
Gerasimos

Reputation: 319

I haven't tried what @kaxil suggested but if I understood correct your question you want ton retrieve the run_id variable from Airflow and use it in your python (pySpark) script. If that's the case, I assume that you use a BashOperator to spark-submit your job. When submitting a spark-job you are allowed to submit (along with your job) some arguments. Those arguments appear as system arguments which you can see if you do a print(sys.argv) (useful to see in which position is your variable). Since you already pushed the variable with the bash_task you will have to pull it. So when you submit your spark job you should also add an extra argument like this :

cmd=""spark-submit your-pyspark-file.py {{ ti.xcom_pull("test") }}

retrieval = BashOperator(
    namespace='randomname',
    arguments=[cmd],
    name='example-dag1',
    task_id='name-you-desire',
    provide_context=True,
    get_logs=True, 
    dag=dag)

Then, if you did execute the print(sys.argv) you will be able to see your variable as an argument and in your script you can refer to that variable by sys.argv[1] (if it is in the second position, 0 if it is in the first etc).

Upvotes: 0

kaxil
kaxil

Reputation: 18824

This value is actually called run_id and can be accessed via the context or macros.

In the Pythonoperator this is accessed via context, and in the BashOperator this is accessed via jinja templating on the bash_command field.

More info on what's available in macros:

https://airflow.incubator.apache.org/code.html#macros

More info on jinja:

https://airflow.incubator.apache.org/concepts.html#jinja-templating

from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


dag = DAG(
    dag_id='run_id',
    schedule_interval=None,
    start_date=datetime(2017, 2, 26)
)

def my_func(**kwargs):
    context = kwargs
    print(context['dag_run'].run_id)

t1 = PythonOperator(
    task_id='python_run_id',
    python_callable=my_func,
    provide_context=True,
    dag=dag
    )

t2 = BashOperator(
    task_id='bash_run_id',
    bash_command='echo {{run_id}}',
    dag=dag)

t1.set_downstream(t2)

Use this dag as an example, and check the log for each operator, you should see the run_id printed in the log.

Upvotes: 1

Related Questions