Reputation: 5648
I'm using falcon framework in python to form json responses of web api.
For instance I have a function called logic()
that works for 30-90min. I want something like this:
somepath_handle()
somepath_handle()
runs logic()
in another thread/process logic()
is finished, thread is closedsomepath_handle()
reads response of logic()
from returnsomepath_handle()
was killed before logic()
was finished, then thread/etc with logic()
isn't stopped until it is finished The code:
def somepath_handle():
run_async_logic()
response=wait_for_async_logic_response() # read response of logic()
return_response(response)
Upvotes: 0
Views: 122
Reputation: 5648
I am using a simple worker to create the queue where I am processing some commands. If add simple response storage than there will be possibility to process any requests and not loss them when connection was lost.
Example: It's main function that used falconframework.org to response to requests.
main.py:
from flow import Flow
import falcon
import threading
import storage
__version__ = 0.1
__author__ = '[email protected]'
app = falcon.API(
media_type='application/json')
app.add_route('/flow', Flow())
THREADS_COUNT = 1
# adding the workers to process queue of command
worker = storage.worker
for _ in xrange(THREADS_COUNT):
thread = threading.Thread(target=worker)
thread.daemon = True
thread.start()
It's simple storage with worker code storage.py:
from Queue import Queue
import subprocess
import logging
main_queque = Queue()
def worker():
global main_roles_queque
while True:
try:
cmd = main_queque.get()
#do_work(item)
#time.sleep(5)
handler = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = handler.communicate()
logging.critical("[queue_worker]: stdout:%s, stderr:%s, cmd:%s" %(stdout, stderr, cmd))
main_queque.task_done()
except Exception as error:
logging.critical("[queue_worker:error] %s" %(error))
It's class that will process any requests [POST, GET] flow.py:
import storage
import json
import falcon
import random
class Flow(object):
def on_get(self, req, resp):
storage_value = storage.main_queque.qsize()
msg = {"qsize": storage_value}
resp.body = json.dumps(msg, sort_keys=True, indent=4)
resp.status = falcon.HTTP_200
#curl -H "Content-Type: application/json" -d '{}' http://10.206.102.81:8888/flow
def on_post(self, req, resp):
r = random.randint(1, 10000000000000)
cmd = 'sleep 1;echo "ss %s"' % str(r)
storage.main_queque.put(cmd)
storage_value = cmd
msg = {"value": storage_value}
resp.body = json.dumps(msg, sort_keys=True, indent=4)
resp.status = falcon.HTTP_200
Upvotes: 0
Reputation: 11
If your process takes such a long time, I advise you to send the result to the user using email, or maybe a live notification system ?
Upvotes: 1