Reputation: 1572
I have created a task with PythonOperator as the operator. It calls for a function in another folder with an argument. But the operator does not accept the argument dag=dag
when in fact it is a must since it is used to point to the dag context.
dags/
- my_dag.py
sub_folder/
- __init__.py
- my_functions.py
My DAG contains task1 and task2. They will call the function from a sub folder, and pass an argument to print.
my_dag.py
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from sub_folder.my_functions import task1, task2
args = {
'owner': 'hello',
'start_date': dt.datetime(2019, 1, 1),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2)
}
dag = DAG(
'try',
default_args = args,
schedule_interval = dt.timedelta(minutes=2))
task1 = PythonOperator(
task_id='task1',
python_callable=task1,
provide_context=True,
op_kwargs={'idx': "Hello "},
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2,
provide_context=True,
op_kwargs={'idx': "World!"},
dag=dag,
)
task1 >> task2
The callable functions are just simple functions that prints the argument passed in them.
my_functions.py
def task1(idx):
print(f"Task 1! {idx}")
def task2(idx):
print(f"Task 2! {idx}")
My task1 is always retrying to run and in some time it will fail. I looked into the logs to find out what's going on. I found that that it gets a
TypeError: task1() got an unexpected keyword argument 'dag'
I don't know what is happening here. Obviously I have to call dag=dag and it's really an argument for making an operator to point which dag container it must have context with.
Upvotes: 4
Views: 3095
Reputation: 1802
There is conflict between my_functions.task1
and the PythonOperator
named task1
try:
import sub_folder.my_functions as mf # changed
task1 = PythonOperator(
task_id='task1',
python_callable=mf.task1, # changed
provide_context=True,
op_kwargs={'idx': "Hello "},
dag=dag,
)
Upvotes: 4