Nikko
Nikko

Reputation: 1572

task got unexpected argument 'dag' in airflow

I have created a task with PythonOperator as the operator. It calls for a function in another folder with an argument. But the operator does not accept the argument dag=dag when in fact it is a must since it is used to point to the dag context.

dags/
- my_dag.py
  sub_folder/
  - __init__.py
  - my_functions.py

My DAG contains task1 and task2. They will call the function from a sub folder, and pass an argument to print.

my_dag.py

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

from sub_folder.my_functions import task1, task2

args = {
    'owner': 'hello',
    'start_date': dt.datetime(2019, 1, 1),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=2)
}

dag = DAG(
    'try',
    default_args = args,
    schedule_interval = dt.timedelta(minutes=2))

task1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    provide_context=True,
    op_kwargs={'idx': "Hello "},
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    provide_context=True,
    op_kwargs={'idx': "World!"},
    dag=dag,
)

task1 >> task2

The callable functions are just simple functions that prints the argument passed in them.

my_functions.py

def task1(idx):
    print(f"Task 1! {idx}")

def task2(idx):
    print(f"Task 2! {idx}")

My task1 is always retrying to run and in some time it will fail. I looked into the logs to find out what's going on. I found that that it gets a

TypeError: task1() got an unexpected keyword argument 'dag'

I don't know what is happening here. Obviously I have to call dag=dag and it's really an argument for making an operator to point which dag container it must have context with.

Upvotes: 4

Views: 3095

Answers (1)

mustafagok
mustafagok

Reputation: 1802

There is conflict between my_functions.task1 and the PythonOperator named task1

try:

import sub_folder.my_functions as mf  # changed

task1 = PythonOperator(
    task_id='task1',
    python_callable=mf.task1,  # changed
    provide_context=True,
    op_kwargs={'idx': "Hello "},
    dag=dag,
)

Upvotes: 4

Related Questions