Reputation: 1018
Recently I set up a Flask POST endpoint to write data into Impala DB via the Impyla module.
Env: Python 3.6.5 on CentOS.
Impala version: impalad version 2.6.0-cdh5.8.0
api.py:
from flask import Flask, request, abort, Response
from flask_cors import CORS
import json
from impala.dbapi import connect
import sys
import re
from datetime import datetime
app = application = Flask(__name__)
CORS(app)
conn = connect(host='datanode2', port=21050,
user='user', database='testdb')
@app.route("/api/endpoint", methods=['POST'])
def post_data():
# if not request.json:
# abort(400)
params = request.get_json(force=True) # getting request data
print(">>>>>> ", params, flush=True)
params['log_time'] = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
# params['page_url'] = re.sub(
# '[^a-zA-Z0-9-_*.]', '', re.sub(':', '_', params['page_url']))
try:
cursor = conn.cursor()
sql = "INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES (%s, %s, %s, %s, %s, %s)"
values = (params['page_title'], params['page_url'], params['log_time'],
params['machine'], params['clicks'], params['id'])
print(">>>>>> " + sql % values, file=sys.stderr, flush=True)
cursor.execute(sql, values)
print(
f">>>>>> Data Written Successfully", file=sys.stderr, flush=True)
return Response(json.dumps({'success': True}), 201, mimetype="application/json")
except Exception as e:
print(e, file=sys.stderr, flush=True)
return Response(json.dumps({'success': False}), 400, mimetype="application/json")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5008, debug=True)
req.py:
import requests as r
url = "http://123.234.345.456:30001/"
# url = "https://stackoverflow.com/questions/ask"
res = r.post('http://localhost:5008/api/endpoint',
json={
"page_title": "Home",
"page_url": url,
"machine": "Mac OS",
"clicks": 16,
"id": "60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db"
}
)
if res.ok:
print(res.json())
else:
print('Error!')
I ran the flask api with python api.py
then test it with python req.py
.
The flask server gives this error:
>>>>>> {'page_title': 'Home', 'page_url': 'http://123.234.345.456:30001/', 'machine': 'Mac OS', 'clicks': 16, 'id': '60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db'}
>>>>>> INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES (Home, http://123.234.345.456:30001/, 2018-12-12 16-14-04, Mac OS, 16, 60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db)
AnalysisException: Syntax error in line 1:
..., 'http://123.234.345.456'2018-12-12 16-14-04'0001/', ...
^
Encountered: INTEGER LITERAL
Expected: AND, AS, ASC, BETWEEN, CROSS, DESC, DIV, ELSE, END, FOLLOWING, FROM, FULL, GROUP, HAVING, ILIKE, IN, INNER, IREGEXP, IS, JOIN, LEFT, LIKE, LIMIT, NOT, NULLS, OFFSET, OR, ORDER, PRECEDING, RANGE, REGEXP, RIGHT, RLIKE, ROWS, THEN, UNION, WHEN, WHERE, COMMA, IDENTIFIER
CAUSED BY: Exception: Syntax error
This Error is kind of annoying:
I tried directly inserting sql command inside impala-shell and it works.
When the page_url is the only parameter, it works fine, too.
So it is some kinds of conditional character escaping issue? I managed to bypass this issue by tweaking the url with some regular expression (Uncomment Line 27 - 28). But this is really annoying, I don't want to clean my data because of this.
When I check other people's trials, it is thought that adding a pair of quotes to each inserting values will work. However, how can I do this when using string formatting, and it has to take place before cursor.execute(sql, values)
?
Upvotes: 2
Views: 1474
Reputation: 1018
After some struggling, and great help from @Scratch'N'Purr and @msafiullah at Parameter substitution issue #317, I managed to make it work. This is kind of complicated so I will post the full code for documentation:
Reason of error: colon escaping issue via the Impyla API.
Solution: Use customised escaping function to process data and adopt sql injection (Python's string formatting way to substitute parameters) instead of the standard Python DB API e.g. cursor.execute(sql, values)
.
api.py:
from flask import Flask, request, abort, Response
from flask_cors import CORS
import json
from impala.dbapi import connect
from impala.util import _escape
import sys
from datetime import datetime
import six
app = application = Flask(__name__)
CORS(app)
conn = connect(host='datanode2', port=21050,
user='user', database='testdb')
def parameterize(value): # by msafiullah
if value is None:
return "NULL"
elif isinstance(value, six.string_types):
return "'" + _escape(value) + "'"
else:
return str(value)
@app.route("/api/endpoint", methods=['POST'])
def post_data():
if not request.json:
abort(400)
params = request.get_json(force=True) # getting request data
print(">>>>>> ", params, flush=True)
params['log_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
cursor = conn.cursor()
sql = 'INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES ( CAST({} AS VARCHAR(64)), {}, {}, CAST({} AS VARCHAR(32)) , {}, CAST({} AS VARCHAR(32)))'\
.format(parameterize(params['page_title']), parameterize(params['page_url']), parameterize(params['log_time']), parameterize(params['machine']), params['clicks'], parameterize(params['id']))
print(">>>>>> " + sql, file=sys.stderr, flush=True)
cursor.execute(sql)
print(
f">>>>>> Data Written Successfully", file=sys.stderr, flush=True)
return Response(json.dumps({'success': True}), 201, mimetype="application/json")
except Exception as e:
print(e, file=sys.stderr, flush=True)
return Response(json.dumps({'success': False}), 400, mimetype="application/json")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5008, debug=True)
req.py is same as Question.
table
schema:
CREATE TABLE if not exists table (
id VARCHAR(36),
machine VARCHAR(32),
clicks INT,
page_title VARCHAR(64),
page_url STRING,
log_time TIMESTAMP
);
Flask's server output:
>>>>>> {'page_title': 'Home', 'page_url': 'http://123.234.345.456:30001/', 'machine': 'Mac OS', 'clicks': 16, 'id': '60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db'}
>>>>>> INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES ( CAST('Home' AS VARCHAR(64)), 'http://123.234.345.456:30001/', '2018-12-14 17:27:29', CAST('Mac OS' AS VARCHAR(32)) , 16, CAST('60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db' AS VARCHAR(32)))
>>>>>> Data Written Successfully
127.0.0.1 - - [14/Dec/2018 17:27:29] "POST /api/endpoint HTTP/1.1" 201 -
Inside Impala-shell, select * from table
will give:
+----------------------------------+--------+--------------+------------+----------------------------------------------------------------------+---------------------+
| id | machine | clicks | page_title | page_url | log_time |
+----------------------------------+--------+--------------+------------+----------------------------------------------------------------------+---------------------+
| 60cd1d79-eda7-44c2-a4ec-ffdd5d6a | Mac OS | 16 | Home | http://123.234.345.456:30001/ | 2018-12-14 17:27:29 |
+----------------------------------+--------+--------------+------------+----------------------------------------------------------------------+---------------------+
Basically, only numbers (e.g. INT
type) do not need to go through the parameterize()
cleaning/escape process. Other types such as VARCHAR
, CHAR
, STRING
, TIMESTAMP
(because of the colons) shall be escaped proeprly to safely insert through the Impyla API.
Upvotes: 3
Reputation: 10399
Impyla or other impala based python libraries don't support parameterized queries, the way that traditional SQL dbs do. The only solution I have come across was to wrap the insert values with quotes if the values are defined as string/timestamp.
You mention how to do this when using string formatting before executing the query? Simple, just apply the string formatting and then insert the formatted value.
In your example, let's assume your table had the following type definitions:
CREATE TABLE table (
page_title VARCHAR(64),
page_url STRING,
log_time TIMESTAMP,
machine VARCHAR(64),
clicks INT,
id CHAR(36)
)
Then your insert statement would be:
sql = "INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES ('%s', '%s', '%s', '%s', %s, '%s')" # note the single quotes around the string/timestamp types
Now since log_time
is a timestamp type, you'll have to format your datetime.now()
to the yyyy-MM-dd HH:mm:ss
format.
params['log_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
If you had defined log_time
as STRING instead of TIMESTAMP, then your format of %Y-%m-%d %H-%M-%S
would work.
Finally, execute:
values = (params['page_title'], params['page_url'], params['log_time'],
params['machine'], params['clicks'], params['id'])
cursor.execute(sql, values)
Note that this method only works when you're working with basic data types such as numerics or strings. Anything complex such as arrays or structs won't work.
Upvotes: 1