cff
cff

Reputation: 23

AttributeError: 'NoneType' object has no attribute 'dag_id'

I get this error and I have no idea what it is happening. I'm making a test in local airflow before putting into production.

                          ____/ (  (    )   )  \___
                         /( (  (  )   _    ))  )   )\
                       ((     (   )(    )  )   (   )  )
                     ((/  ( _(   )   (   _) ) (  () )  )
                    ( (  ( (_)   ((    (   )  .((_ ) .  )_
                   ( (  )    (      (  )    )   ) . ) (   )
                  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
                  ( (  (   ) (  )   (  ))     ) _)(   )  )  )
                 ( (  ( \ ) (    (_  ( ) ( )  )   ) )  )) ( )
                  (  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
                 ( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
                  ((  (   )(    (     _    )   _) _(_ (  (_ )
                   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
                   ((__)        \\||lll|l||///          \_))
                            (   /(/ (  )  ) )\   )
                          (    ( ( ( | | ) ) )\   )
                           (   /(| / ( )) ) ) )) )
                         (     ( ((((_(|)_)))))     )
                          (      ||\(|(|)|/||     )
                        (        |(||(||)||||        )
                          (     //|/l|||)|\\ \     )
                        (/ / //  /|//||||\\  \ \  \ _)
-------------------------------------------------------------------------------
Node: cfb554ee0699
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 290, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 945, in log
    dag_id=dag.dag_id, task_id=task_id,
AttributeError: 'NoneType' object has no attribute 'dag_id'

The SUBDAG that is failing has the following code:

from airflow import DAG
from airflow.models import Variable
from airflow.operators import SalesforcetoMySQL

dag_name = "salesforce_bikes"
template_path = Variable.get("airflow_home") + "/templates/" + dag_name

default_args = {
    "owner": "carla",
    "depends_on_past": False,
    "start_date": "2021-03-18 07:00:00",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),

}

dag = DAG(
    dag_name, default_args=default_args, schedule_interval="45 6 * * *", template_searchpath=template_path, catchup=False
)

salesforce_bikes = SalesforcetoMySQL(
    task_id="salesforce_bikes",
    sf_conn_id="salesforce",
    sf_query="salesforce_bikes.sql",
    dest_conn_id="carla_mysql",
    dest_schema="salesforce",
    dest_trunc_table=True,
    dest_table="salesforce_bikes",
    dest_insert_table_script="insert_salesforce_bikes.sql",
    object="bikes",
    params={"dest_table": "salesforce_bikes"},
    dag=dag,
)

These DAG wants to populate data from Salesforce to MySQL. As well, when I try to trigger the general DAG I find a message of "too many connections".

Upvotes: 1

Views: 4801

Answers (2)

TobyEvans
TobyEvans

Reputation: 1461

I had something similar, I had a function that created a subdag, but I forgot to return the dag I created at the end of the function ... 🤦🏻

Upvotes: 2

Sezai Burak Kantarcı
Sezai Burak Kantarcı

Reputation: 225

You cannot initiate a DAG without a name given, like it's told in the Error.

To do that, you can use dag_id paramater. Do the following change at line 16, in your code:

from airflow import DAG
from airflow.models import Variable
from airflow.operators import SalesforcetoMySQL
from airflow.operators.dummy import DummyOperator

dag_name = "salesforce_bikes"
template_path = Variable.get("airflow_home") + "/templates/" + dag_name

default_args = {
    "owner": "carla",
    "depends_on_past": False,
    "start_date": "2021-03-18 07:00:00",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),

}

dag = DAG(
    dag_id = dag_name, default_args=default_args, schedule_interval="45 6 * * *", template_searchpath=template_path, catchup=False
)

salesforce_bikes = SalesforcetoMySQL(
    task_id="salesforce_bikes",
    sf_conn_id="salesforce",
    sf_query="salesforce_bikes.sql",
    dest_conn_id="carla_mysql",
    dest_schema="salesforce",
    dest_trunc_table=True,
    dest_table="salesforce_bikes",
    dest_insert_table_script="insert_salesforce_bikes.sql",
    object="bikes",
    params={"dest_table": "salesforce_bikes"},
    dag=dag,
)

dummy_task = DummyOperator(
    task_id = "Starter",
    dag = dag
)

dummy_task >> salesforce_bikes

This should work :)

Upvotes: 0

Related Questions