Reputation: 243
I have been in Airflow 1.10.14 for a long time, and now I'm trying to upgrade to Airflow 2.4.3 (latest?) I have built this dag in the new format in hopes to assimilate the language and understand how the new format works. Below is my dag:
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
import glob
path = '~/airflow/staging/gcs/offrs2/'
clear_Staging_Folders = """
rm -rf {}OFFRS2/LEADS*.*
""".format(Variable.get("temp_directory"))
@dag(
schedule_interval='@daily',
start_date=datetime(2022, 11, 1),
catchup=False,
tags=['offrs2', 'LEADS']
)
def taskflow():
CLEAR_STAGING = BashOperator(
task_id='Clear_Folders',
bash_command=clear_Staging_Folders,
dag=dag,
)
BQ_Output = BigQueryInsertJobOperator(
task_id='BQ_Output',
configuration={
"query": {
"query": '~/airflow/sql/Leads/Leads_Export.sql',
"useLegacySql": False
}
}
)
Prep_MSSQL = MsSqlOperator(
task_id='Prep_DB3_Table',
mssql_conn_id = 'db.offrs.com',
sql='truncate table offrs_staging..LEADS;'
)
@task
def Load_Staging_Table():
for files in glob.glob(path + 'LEADS*.csv'):
print(files)
CLEAR_STAGING >> BQ_Output >> Load_Staging_Table()
dag = taskflow()
when I send this up, I'm getting the below error:
Broken DAG: [/home/airflow/airflow/dags/BQ_OFFRS2_Leads.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 376, in apply_defaults
task_group = TaskGroupContext.get_current_task_group(dag)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/task_group.py", line 490, in get_current_task_group
return dag.task_group
AttributeError: 'function' object has no attribute 'task_group'
As I look at my code, I don't have a specified task_group
.
Where am I going wrong here?
Thank you!
Upvotes: 2
Views: 635
Reputation: 9363
You forgot to remove an undefined dag
variable in CLEAR_STAGING
. When you are using decorator, remove dag=dag
.
CLEAR_STAGING = BashOperator(
task_id='Clear_Folders',
bash_command=clear_Staging_Folders,
# dag=dag <== Remove this
)
Upvotes: 2