Riyan Mohammed
Riyan Mohammed

Reputation: 367

Airflow Python operator passing parameters

I'm trying to write a Python operator in an airflow DAG and pass certain parameters to the Python callable.

My code looks like below.

def my_sleeping_function(threshold):
   print(threshold)

fmfdependency = PythonOperator(
   task_id='poke_check',
   python_callable=my_sleeping_function,
   provide_context=True,
   op_kwargs={'threshold': 100},
   dag=dag)

end = BatchEndOperator(
   queue=QUEUE,
   dag=dag)

start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)

But I keep getting the below error.

TypeError: my_sleeping_function() got an unexpected keyword argument 'dag_run'

Not able to figure out why.

Upvotes: 11

Views: 26254

Answers (3)

Phil
Phil

Reputation: 141

This is an old question now, with a few solutions. None of them say why you're getting the error though.

It's because of provide_context=True. It passes an addition set of keyword arguments to the function. A list of what it passes can be found here: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-variables

As you can see, dag_run is there. You can either set provide_context=False or add **kwargs to your function to handle them. Hopefully this helps anyone else who stumbles on this question.

Upvotes: 2

Shayan
Shayan

Reputation: 2481

This is how you can pass arguments for a Python operator in Airflow.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime

def my_func(*op_args):
        print(op_args)
        return op_args[0]

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])

        dummy_task >> python_task

Upvotes: 4

trejas
trejas

Reputation: 1031

Add **kwargs to your operator parameters list after your threshold param

Upvotes: 11

Related Questions