arcee123
arcee123

Reputation: 243

writing Airflow 2 dag

I have been in Airflow 1.10.14 for a long time, and now I'm trying to upgrade to Airflow 2.4.3 (latest?) I have built this dag in the new format in hopes to assimilate the language and understand how the new format works. Below is my dag:

from airflow.decorators import dag, task
from airflow.models import Variable

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.bash import BashOperator

from datetime import datetime
import glob

path = '~/airflow/staging/gcs/offrs2/'

clear_Staging_Folders = """
rm -rf {}OFFRS2/LEADS*.*
""".format(Variable.get("temp_directory"))


@dag(
    schedule_interval='@daily',
    start_date=datetime(2022, 11, 1),
    catchup=False,
    tags=['offrs2', 'LEADS']
)
def taskflow():

    CLEAR_STAGING = BashOperator(
        task_id='Clear_Folders',
        bash_command=clear_Staging_Folders,
        dag=dag,
    )

    BQ_Output = BigQueryInsertJobOperator(
        task_id='BQ_Output',
        configuration={
            "query": {
                "query": '~/airflow/sql/Leads/Leads_Export.sql',
                "useLegacySql": False
            }
        }
    )

    Prep_MSSQL = MsSqlOperator(
        task_id='Prep_DB3_Table',
        mssql_conn_id = 'db.offrs.com',
        sql='truncate table offrs_staging..LEADS;'
    )

    @task
    def Load_Staging_Table():
        for files in glob.glob(path + 'LEADS*.csv'):
            print(files)


    CLEAR_STAGING >> BQ_Output >> Load_Staging_Table()

dag = taskflow()

when I send this up, I'm getting the below error:

Broken DAG: [/home/airflow/airflow/dags/BQ_OFFRS2_Leads.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 376, in apply_defaults
    task_group = TaskGroupContext.get_current_task_group(dag)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/task_group.py", line 490, in get_current_task_group
    return dag.task_group
AttributeError: 'function' object has no attribute 'task_group'

As I look at my code, I don't have a specified task_group. Where am I going wrong here? Thank you!

Upvotes: 2

Views: 635

Answers (1)

Emma
Emma

Reputation: 9363

You forgot to remove an undefined dag variable in CLEAR_STAGING. When you are using decorator, remove dag=dag.

CLEAR_STAGING = BashOperator(
    task_id='Clear_Folders',
    bash_command=clear_Staging_Folders,
    # dag=dag   <== Remove this
)

Upvotes: 2

Related Questions