Reputation: 505
I'm developing an application that sends data to the flask back end. Then the flask inserts the received into elastic search. Before inserting to elastic search it will check whether the id is existing or not.If the id exists it update or else it will insert to the index.
Sample code :
from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)
The problem here is, the requests are sending for every 5 seconds from the front end. So it conflicts sometimes,(ie) whenever a request is received with the id and at the same time the insertion process of the id takes place. The second request assumes that the id is not present in the database so it also inserts which turns 2 data with the same id in the index. What should I do to insert one at a time and the other should wait?
python - 3.6
Edited : Tried using semaphore :
from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
import threading
sLock = threading.Semaphore()
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
sLock.acquire()
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
sLock.release()
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)
Thanks in advance!
Upvotes: 0
Views: 698
Reputation: 3212
You could use a mget method and set a time threshold. In this way you don't send one request for time but you send a request with a list of ids - doc here
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from elasticsearch import helpers
idL = [] # create it before the flask route declaration
threshold = 5 #set a threshold in the same way
now = datetime.now()
delta = timedelta(seconds=30) # set a time threshold of 1 minute
def update(result):
for success, info in helpers.parallel_bulk(client= es, actions=createUpdateElem(result ):
if not success:
print(info)
def index(result):
for success, info in helpers.parallel_bulk(client= es, actions=createIndexElem(result ):
if not success:
print(info)
def createIndexElem(result):
for elem in result:
yield {
'_op_type': 'index',
'_index': 'database',
'_id': elem,
'_source': {'question': 'The life, universe and everything.'}
}
def createUpdateElem(result):
for elem in result:
yield {
'_op_type': 'update',
'_index': 'database',
'_id': elem,
'doc': {'question': 'The life, universe and everything.'}
}
def checkResponse(response, idL):
updateL = []
for elem in response['docs']:
if elem['_id'] in idL:
updateL.append(elem['_id'])
toBeIndexed = list(set(idL) - set(updateL))
return toBeIndexed,updateL
def multiget(idL):
response = es.mget(index = 'database',body = {'ids': idL})
doc2BeIndicized = checkResponse(response, idL)
now = datetime.now()
idL = []
return doc2BeIndicized
@app.route('/test',methods=['POST'])
def hello():
try:
id = request.form['id']
idL.append(id)
if len(idL) > threshold:
result = multiget(idL)
if result:
indexed, updated = result
if updated:
update(updated)
if indexed:
index(indexed)
elif (now + delta) > datetime.now():
result = multiget(idL)
if result:
indexed, updated = result
if updated:
update(updated)
if indexed:
index(indexed)
else:
continue
In the same way you could index or update a list of document with bulk - or parallel bulk that is better in a service because it uses multithread. doc here . Remember that you need to parse the response of mget call because in a list it is possible that only certain elements are presents in es and others not
Upvotes: 1