Riyan Mohammed
Riyan Mohammed

Reputation: 367

Airflow Python Operator with a. return type

I have a python operator in my DAG. The python callable function is returning a bool value. But, when I run the DAG, I get the below error.

TypeError: 'bool' object is not callable

I modified the function to return nothing but then again I keep getting the below error

ERROR - 'NoneType' object is not callable

Below is my dag

def check_poke(threshold,sleep_interval):
flag=snowflake_poke(1000,10).poke()
#print(flag)
return flag

dependency = PythonOperator(
task_id='poke_check',
#python_callable=check_poke(129600,600),
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)

end = BatchEndOperator(
queue=QUEUE,
dag=dag)

start.set_downstream(dependency)
dependency.set_downstream(end)

Not able to figure out what it is that I am missing. Can someone help me out on this...Fairly new to airflow.

I edited the python operator in the dag as below

dependency = PythonOperator(
task_id='poke_check',
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)

But now, I get a different error.

Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: () takes no arguments (25 given)
[2019-02-15 05:30:25,375] {models.py:1298} INFO - Marking task as UP_FOR_RETRY
[2019-02-15 05:30:25,393] {models.py:1327} ERROR - () takes no arguments (25 given)

Upvotes: 1

Views: 5233

Answers (3)

Anibal Kolker
Anibal Kolker

Reputation: 1

The code expects a callable, not the result (as already pointed out).
You could use functools.Partial to fill out the arguments:

from functools import partial

def check_poke(threshold,sleep_interval):
   flag=snowflake_poke(1000,10).poke()
   return flag

func = partial(check_poke, 129600, 600)
dependency = PythonOperator(
    task_id='poke_check',
    provide_context=True,
    python_callable=func,
    dag=dag)

Upvotes: 0

Dan D.
Dan D.

Reputation: 74645

The argument name gives it away. You are passing the result of a call rather than a callable.

python_callable=check_poke(129600,600)

The second error states that the callable is called with 25 arguments. So a lambda: won't work. The following would work but ignoring 25 arguments is really questionable.

python_callable=lambda *args, **kwargs: check_poke(129600,600)

Upvotes: 1

y2k-shubham
y2k-shubham

Reputation: 11607

Agree with @Dan D. for the issue; but it's perplexing why his solution didn't work (it certainly works in python shell)

See if this finds you any luck (its just verbose variant of @Dan D.'s solution)

from typing import Callable

# your original check_poke function
def check_poke(arg_1: int, arg_2: int) -> bool:
    # do something
    # somehow returns a bool
    return arg_1 < arg_2

# a function that returns a callable, that in turn invokes check_poke
# with the supplied params
def check_poke_wrapper_creator(arg_1: int, arg_2: int) -> Callable[[], bool]:
    def check_poke_wrapper() -> bool:
        return check_poke(arg_1=arg_1, arg_2=arg_2)

    return check_poke_wrapper

..

# usage
python_callable=check_poke_wrapper_creator(129600, 600)

Upvotes: 0

Related Questions