How to Process requests one by one in flask?

I'm developing an application that sends data to the flask back end. Then the flask inserts the received into elastic search. Before inserting to elastic search it will check whether the id is existing or not.If the id exists it update or else it will insert to the index.

Sample code :

from flask import Flask
from flask import  jsonify, request
import jwt
from elasticsearch import Elasticsearch
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():       
    try:         
        id  = request.form['id']
        database = "sample"
        es =Elasticsearch("localhost",port = 9200)
        cols=es.search(index=database,  body={ "query": {  "match": { "id": id}}})
        present =False
        if cols['hits']['hits']:
            x1=cols['hits']['hits'][0]['_source']
            eid = cols['hits']['hits'][0]['_id']
            present =True        
        if present == False:                          
            newvalues = {"url":"hello",'id':id}
            es.index(index=database, doc_type="logs", body=newvalues)      
        else: #if already there append data                        
            newvalues ={}                           
            es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})            
        return jsonify({'status': 'success'})
    except jwt.InvalidTokenError  as e:
        print(e)
        return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if  __name__=="__main__":
    try:
        app.run(host="localhost",port=5005,debug=True,processes =1)
    except Exception as e:
        print("exception in test",e)

The problem here is, the requests are sending for every 5 seconds from the front end. So it conflicts sometimes,(ie) whenever a request is received with the id and at the same time the insertion process of the id takes place. The second request assumes that the id is not present in the database so it also inserts which turns 2 data with the same id in the index. What should I do to insert one at a time and the other should wait?

python - 3.6

Edited : Tried using semaphore :

from flask import Flask
from flask import  jsonify, request
import jwt
from elasticsearch import Elasticsearch
import threading
sLock = threading.Semaphore()
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello(): 
    sLock.acquire()  
    try:         
        id  = request.form['id']
        database = "sample"
        es =Elasticsearch("localhost",port = 9200)
        cols=es.search(index=database,  body={ "query": {  "match": { "id": id}}})
        present =False
        if cols['hits']['hits']:
            x1=cols['hits']['hits'][0]['_source']
            eid = cols['hits']['hits'][0]['_id']
            present =True        
        if present == False:                          
            newvalues = {"url":"hello",'id':id}
            es.index(index=database, doc_type="logs", body=newvalues)      
        else: #if already there append data                        
            newvalues ={}                           
            es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
        sLock.release()            
        return jsonify({'status': 'success'})
    except jwt.InvalidTokenError  as e:
        print(e)
        return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if  __name__=="__main__":
    try:
        app.run(host="localhost",port=5005,debug=True,processes =1)
    except Exception as e:
        print("exception in test",e)

Thanks in advance!

Upvotes: 0

Views: 698

Answers (1)

Lupanoide
Lupanoide

Reputation: 3212

You could use a mget method and set a time threshold. In this way you don't send one request for time but you send a request with a list of ids - doc here

from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from elasticsearch import helpers    

idL = [] # create it before the flask route declaration
threshold = 5 #set a threshold in the same way
now = datetime.now()
delta = timedelta(seconds=30) # set a time threshold of 1 minute

def update(result):
    for success, info in helpers.parallel_bulk(client= es, actions=createUpdateElem(result ):
        if not success:
            print(info)
def index(result):
    for success, info in helpers.parallel_bulk(client= es, actions=createIndexElem(result ):
        if not success:
            print(info)

def createIndexElem(result):
     for elem in result:
         yield {
           '_op_type': 'index',
           '_index': 'database',
           '_id': elem,
           '_source': {'question': 'The life, universe and everything.'}
          }


def createUpdateElem(result):
    for elem in result:
         yield {
           '_op_type': 'update',
           '_index': 'database',
           '_id': elem,
           'doc': {'question': 'The life, universe and everything.'}
          }


def checkResponse(response, idL):
    updateL = []
    for elem in response['docs']:
        if elem['_id'] in idL:
            updateL.append(elem['_id'])
    toBeIndexed = list(set(idL) - set(updateL))
    return toBeIndexed,updateL




def multiget(idL):        
     response = es.mget(index = 'database',body = {'ids': idL})
     doc2BeIndicized = checkResponse(response, idL)
     now = datetime.now()
     idL = []
     return doc2BeIndicized


 @app.route('/test',methods=['POST'])
 def hello():       
    try:  

id  = request.form['id']
idL.append(id)
if len(idL) > threshold:
    result = multiget(idL)
    if result:
        indexed, updated = result
        if updated:
            update(updated)
        if indexed:
            index(indexed)
elif (now + delta) > datetime.now():
    result = multiget(idL)
    if result:
        indexed, updated = result
        if updated:
            update(updated)
        if indexed:
            index(indexed)
else:
     continue

In the same way you could index or update a list of document with bulk - or parallel bulk that is better in a service because it uses multithread. doc here . Remember that you need to parse the response of mget call because in a list it is possible that only certain elements are presents in es and others not

Upvotes: 1

Related Questions