Reputation: 367
I'm trying to write a Python operator in an airflow DAG and pass certain parameters to the Python callable.
My code looks like below.
def my_sleeping_function(threshold):
print(threshold)
fmfdependency = PythonOperator(
task_id='poke_check',
python_callable=my_sleeping_function,
provide_context=True,
op_kwargs={'threshold': 100},
dag=dag)
end = BatchEndOperator(
queue=QUEUE,
dag=dag)
start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)
But I keep getting the below error.
TypeError: my_sleeping_function() got an unexpected keyword argument 'dag_run'
Not able to figure out why.
Upvotes: 11
Views: 26254
Reputation: 141
This is an old question now, with a few solutions. None of them say why you're getting the error though.
It's because of provide_context=True
. It passes an addition set of keyword arguments to the function. A list of what it passes can be found here:
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-variables
As you can see, dag_run
is there. You can either set provide_context=False
or add **kwargs
to your function to handle them.
Hopefully this helps anyone else who stumbles on this question.
Upvotes: 2
Reputation: 2481
This is how you can pass arguments for a Python operator in Airflow.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
def my_func(*op_args):
print(op_args)
return op_args[0]
with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])
dummy_task >> python_task
Upvotes: 4
Reputation: 1031
Add **kwargs to your operator parameters list after your threshold param
Upvotes: 11