Reputation: 493
I want to pass parameters into airflow DAG and use them in python function. I can use the parameter into bash operator, but I can’t find any reference to use them as python function.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago
#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))
#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"
run_this="echo "+owner+"."+table
def test_func(owner,table):
print(owner+"."+table)
task1 = BashOperator(
task_id='test_task1',
bash_command=run_this,
dag=dag,
queue='cdp_node53',
)
task2 = PythonOperator(
task_id='test_task2',
python_callable=test_func(owner,table),
dag=dag,
queue='cdp_node53',
)
I want to pass below as parameters while trigger DAG. "task1" works fine for me. I need to make "task2" workable. Please guide me to correct the above code so that I can pass parameters into it.
{"owner":"test_owner","table":"test_table"}
Upvotes: 0
Views: 10888
Reputation: 3589
For passing arguments into the PythonOperator
you should use either op_args
(for positional arguments) or op_kwargs
(for keyword arguments). Both parameters are also template fields so the values can be Jinja expressions as well.
Refactoring your code using op_kwargs
:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))
#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"
run_this="echo "+owner+"."+table
def test_func(owner,table):
print(owner+"."+table)
task1 = BashOperator(
task_id='test_task1',
bash_command=run_this,
dag=dag,
queue='cdp_node53',
)
task2 = PythonOperator(
task_id='test_task2',
python_callable=test_func,
op_kwargs={"owner": owner, "table": table},
dag=dag,
queue='cdp_node53',
)
Both tasks will log the INFO - test_owner.test_table
now.
Upvotes: 3