Reputation: 677
I have an airflow instance on EC2 that is running the webserver/scheduler. I want to hook up a MySQL RDS instance as the backend metadata database as opposed to the native SQLite. I replaced the one line in Airflow.cfg that connects to the database via sql_alchemy to connect to RDS with a pymysql driver:
#sql_alchemy_conn = sqlite:////home/cloud-user/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://admin:<PASSWORD>@airflow-db.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com:3306/airflow
The connection seems to work fine and I am able to get into the RDS instance and query tables via a MySQL client set up on my EC2 instance.
When I toggle a DAG on or off, I get this nasty python stack trace in my shell:
[2019-09-30 14:00:51,774] {app.py:1891} ERROR - Exception on /admin/airflow/paused [POST]
Traceback (most recent call last):
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 2446, in wsgi_app
response = self.full_dispatch_request()
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1820, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
rv = self.dispatch_request()
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1935, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_login/utils.py", line 258, in decorated_view
return func(*args, **kwargs)
File "/home/cloud-user/.local/lib/python2.7/site-packages/airflow/www/utils.py", line 279, in wrapper
session.commit()
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
self.transaction.commit()
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
self._prepare_impl()
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
self.session.flush()
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2459, in flush
self._flush(objects)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
transaction.rollback(_capture_exception=True)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2557, in _flush
flush_context.execute()
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
statement, params
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 988, in execute
return meth(self, multiparams, params)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
distilled_params,
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1253, in _execute_context
e, statement, parameters, cursor, context
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1473, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1249, in _execute_context
cursor, statement, parameters, context
File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
cursor.execute(statement, parameters)
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute
result = self._query(query)
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query
conn.query(q)
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 517, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 732, in _read_query_result
result.read()
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 1075, in read
first_packet = self.connection._read_packet()
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 684, in _read_packet
packet.check_error()
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/protocol.py", line 220, in check_error
err.raise_mysql_exception(self._data)
File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/err.py", line 109, in raise_mysql_exception
raise errorclass(errno, errval)
ProgrammingError: (pymysql.err.ProgrammingError) (1064, u"You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '[(\\'is_paused\\', u\\'true\\'), (\\'dag_id\\', u\\'test\\')]'')' at line 1")
[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (%(dttm)s, %(dag_id)s, %(task_id)s, %(event)s, %(execution_date)s, %(owner)s, %(extra)s)]
[parameters: {'task_id': None, 'extra': "[('is_paused', u'true'), ('dag_id', u'test')]", 'execution_date': None, 'event': 'paused', 'owner': 'anonymous', 'dttm': datetime.datetime(2019, 9, 30, 18, 0, 51, 768073, tzinfo=<Timezone [UTC]>), 'dag_id': u'test'}]
(Background on this error at: http://sqlalche.me/e/f405)
From what I saw on the Airflow documentation, I'm making the change correctly, and the 'airflow initdb / resetdb' commands execute without error.
I already spent a fair amount of time googling this error, but there are no clear answers to this problem. I'm really not sure if I'm missing a prerequisite or if I should be using a different connector?
EDIT: I'm using python 2.7, as seen in the stack trace. Airflow claims compatibility in the short-term, but I see another SO user's problems went away after upgrading to python 3.6: Link to other solution. I'll try this and update if it seems to work.
Upvotes: 1
Views: 3929