Reputation: 23
I get this error and I have no idea what it is happening. I'm making a test in local airflow before putting into production.
____/ ( ( ) ) \___
/( ( ( ) _ )) ) )\
(( ( )( ) ) ( ) )
((/ ( _( ) ( _) ) ( () ) )
( ( ( (_) (( ( ) .((_ ) . )_
( ( ) ( ( ) ) ) . ) ( )
( ( ( ( ) ( _ ( _) ). ) . ) ) ( )
( ( ( ) ( ) ( )) ) _)( ) ) )
( ( ( \ ) ( (_ ( ) ( ) ) ) ) )) ( )
( ( ( ( (_ ( ) ( _ ) ) ( ) ) )
( ( ( ( ( ) (_ ) ) ) _) ) _( ( )
(( ( )( ( _ ) _) _(_ ( (_ )
(_((__(_(__(( ( ( | ) ) ) )_))__))_)___)
((__) \\||lll|l||/// \_))
( /(/ ( ) ) )\ )
( ( ( ( | | ) ) )\ )
( /(| / ( )) ) ) )) )
( ( ((((_(|)_))))) )
( ||\(|(|)|/|| )
( |(||(||)|||| )
( //|/l|||)|\\ \ )
(/ / // /|//||||\\ \ \ \ _)
-------------------------------------------------------------------------------
Node: cfb554ee0699
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line 290, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line 945, in log
dag_id=dag.dag_id, task_id=task_id,
AttributeError: 'NoneType' object has no attribute 'dag_id'
The SUBDAG that is failing has the following code:
from airflow import DAG
from airflow.models import Variable
from airflow.operators import SalesforcetoMySQL
dag_name = "salesforce_bikes"
template_path = Variable.get("airflow_home") + "/templates/" + dag_name
default_args = {
"owner": "carla",
"depends_on_past": False,
"start_date": "2021-03-18 07:00:00",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_name, default_args=default_args, schedule_interval="45 6 * * *", template_searchpath=template_path, catchup=False
)
salesforce_bikes = SalesforcetoMySQL(
task_id="salesforce_bikes",
sf_conn_id="salesforce",
sf_query="salesforce_bikes.sql",
dest_conn_id="carla_mysql",
dest_schema="salesforce",
dest_trunc_table=True,
dest_table="salesforce_bikes",
dest_insert_table_script="insert_salesforce_bikes.sql",
object="bikes",
params={"dest_table": "salesforce_bikes"},
dag=dag,
)
These DAG wants to populate data from Salesforce to MySQL. As well, when I try to trigger the general DAG I find a message of "too many connections".
Upvotes: 1
Views: 4801
Reputation: 1461
I had something similar, I had a function that created a subdag, but I forgot to return the dag I created at the end of the function ... 🤦🏻
Upvotes: 2
Reputation: 225
You cannot initiate a DAG without a name given, like it's told in the Error.
To do that, you can use dag_id
paramater. Do the following change at line 16, in your code:
from airflow import DAG
from airflow.models import Variable
from airflow.operators import SalesforcetoMySQL
from airflow.operators.dummy import DummyOperator
dag_name = "salesforce_bikes"
template_path = Variable.get("airflow_home") + "/templates/" + dag_name
default_args = {
"owner": "carla",
"depends_on_past": False,
"start_date": "2021-03-18 07:00:00",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_id = dag_name, default_args=default_args, schedule_interval="45 6 * * *", template_searchpath=template_path, catchup=False
)
salesforce_bikes = SalesforcetoMySQL(
task_id="salesforce_bikes",
sf_conn_id="salesforce",
sf_query="salesforce_bikes.sql",
dest_conn_id="carla_mysql",
dest_schema="salesforce",
dest_trunc_table=True,
dest_table="salesforce_bikes",
dest_insert_table_script="insert_salesforce_bikes.sql",
object="bikes",
params={"dest_table": "salesforce_bikes"},
dag=dag,
)
dummy_task = DummyOperator(
task_id = "Starter",
dag = dag
)
dummy_task >> salesforce_bikes
This should work :)
Upvotes: 0