Valeriy Solovyov
Valeriy Solovyov

Reputation: 5648

How to store response when http client closed a connection?

I'm using falcon framework in python to form json responses of web api.

For instance I have a function called logic() that works for 30-90min. I want something like this:

  1. When http-client asks for /api/somepath.json we call somepath_handle()
  2. somepath_handle() runs logic() in another thread/process
  3. When logic() is finished, thread is closed
  4. somepath_handle() reads response of logic() from return
  5. If somepath_handle() was killed before logic() was finished, then thread/etc with logic() isn't stopped until it is finished

The code:

def somepath_handle():
    run_async_logic()
    response=wait_for_async_logic_response() # read response of logic()
    return_response(response)

Upvotes: 0

Views: 122

Answers (2)

Valeriy Solovyov
Valeriy Solovyov

Reputation: 5648

I am using a simple worker to create the queue where I am processing some commands. If add simple response storage than there will be possibility to process any requests and not loss them when connection was lost.

Example: It's main function that used falconframework.org to response to requests.

main.py:

from flow import Flow
import falcon
import threading
import storage

__version__ = 0.1
__author__ = '[email protected]'


app = falcon.API(
    media_type='application/json')

app.add_route('/flow', Flow())

THREADS_COUNT = 1
# adding the workers to process queue of command
worker = storage.worker
for _ in xrange(THREADS_COUNT):
    thread = threading.Thread(target=worker)
    thread.daemon = True
    thread.start()

It's simple storage with worker code storage.py:

from Queue import Queue
import subprocess
import logging
main_queque = Queue()

def worker():
    global main_roles_queque
    while True:
        try:
            cmd = main_queque.get()
            #do_work(item)
            #time.sleep(5)
            handler = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
            stdout, stderr = handler.communicate()
            logging.critical("[queue_worker]: stdout:%s, stderr:%s, cmd:%s" %(stdout, stderr, cmd))
            main_queque.task_done()
        except Exception as error:
            logging.critical("[queue_worker:error] %s" %(error))

It's class that will process any requests [POST, GET] flow.py:

import storage
import json
import falcon
import random

class Flow(object):

    def on_get(self, req, resp):
        storage_value = storage.main_queque.qsize()
        msg = {"qsize": storage_value}
        resp.body = json.dumps(msg, sort_keys=True, indent=4)
        resp.status = falcon.HTTP_200

    #curl -H "Content-Type: application/json" -d '{}'  http://10.206.102.81:8888/flow
    def on_post(self, req, resp):
        r = random.randint(1, 10000000000000)
        cmd = 'sleep 1;echo "ss %s"' % str(r)
        storage.main_queque.put(cmd)
        storage_value = cmd
        msg = {"value": storage_value}
        resp.body = json.dumps(msg, sort_keys=True, indent=4)
        resp.status = falcon.HTTP_200

Upvotes: 0

Pierre Goiffon
Pierre Goiffon

Reputation: 11

If your process takes such a long time, I advise you to send the result to the user using email, or maybe a live notification system ?

Upvotes: 1

Related Questions