Jacob
Jacob

Reputation: 359

Starting Background Daemon in Flask App

So I'm building a longrunning query web app for internal use.

My goal is to have a flask app with a daemon process that starts when the server starts, that will update a global dictionary object.

I don't necessarily have any sample code to post, as I've tried to accomplish this many ways and none have been successful.

The daemon will be creating a thread pool (multiprocessing.Pool) to loop through all database instances and running a couple queries on them.

It seems that no matter how I try and implement this (right now, using the flask development server) it locks up the app and nothing else can be done while it's running. I have tried reading through a bunch of documentation, but as per usual a lot of other knowledge is assumed and I end up overwhelmed.

I'm wondering if anyone can offer some guidance, even if it's places I can look for this, because I have searched all over for 'flask startup routine' and similar, but have found nothing of use. It seems that when I deploy this to our server, I may be able to define some startup daemons in my .wsgi file, but until then is there any way to do this locally? Is that even the right approach when I do push it out for General use?

Otherwise, I was just thinking of setting up a cron job that continuously runs a python script that does the queries I need, and dumps to a MongoDB instance or something, so that the clients can simply pull from that (as doing all of the queries on the server side of the Flask app just locks up the server, so nothing else can be done with it -- aka: can't take action on info, kill spids etc)

Any help with this would help majorly, my brain has been spinning for days.

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'amqp://guest@localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'amqp://guest@localhost//'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

output = 0

@app.before_first_request
def init():
    task = my_task.apply_async()

@app.route('/')
def hello_world():
    global output
    return 'Hello World! - ' + str(output)


@celery.task
def my_task():
    global output
    result = 0
    for i in range(100):
        result += i
        output = result




if __name__ == '__main__':
    app.run()

Upvotes: 6

Views: 10229

Answers (2)

Visgean Skeloru
Visgean Skeloru

Reputation: 2263

Well, first of all: don't try to solve this problem by yourself: don't use threads or any kind of multiprocessing. Why? Because later on you want to scale up and the best way is to leave this up to the server - gunicorn, uwsgi. If you would try to handle this by yourself it would very likely collide with how these servers works.

Instead what you should do is to use one service for processing the request and message queue with a worker process that handles asynchronous tasks. This approach is more better at scaling.

From your question it seems that your are not looking for an answer but rather for guidance, have a look here: http://flask.pocoo.org/docs/0.10/patterns/celery/ and this https://www.quora.com/Celery-distributed-task-queue-What-is-the-difference-between-workers-and-processes

The advantage here is that the web worker / task worker / celery solution scales much better than the alternatives as the only bottleneck is the database.

Upvotes: 0

Serdmanczyk
Serdmanczyk

Reputation: 1224

Depending how complex your query is, you could consider running your query via a second thread. Because of the GIL you don't need to worry about common data structure objects (such as the dictionary) being thread safe. A nice thing about threads is even though there's a GIL they're generally good about not blocking other threads executing during intense I/O (such as a thread for queries). See 2. Trivial example:

import threading
import time
import random
from flask import Flask

app = Flask(__name__)

data_store = {'a': 1}
def interval_query():
    while True:
        time.sleep(1)
        vals = {'a': random.randint(0,100)}
        data_store.update(vals)

thread = threading.Thread(name='interval_query', target=interval_query)
thread.setDaemon(True)
thread.start()

@app.route('/')
def hello_world():
    return str(data_store['a'])

if __name__ == "__main__":
    app.run()

Upvotes: 5

Related Questions