jbaiter
jbaiter

Reputation: 7099

gunicorn with gevent workers: Using a shared global list

I am trying to implement Server-Sent Events in my Flask application by following this simple recipe: http://flask.pocoo.org/snippets/116/

For serving the app, I use gunicorn with gevent workers.

A minimal version of my code looks like this:

import multiprocessing

from gevent.queue import Queue
from gunicorn.app.base import BaseApplication
from flask import Flask, Response

app = Flask('minimal')
# NOTE: This is the global list of subscribers
subscriptions = []


class ServerSentEvent(object):
    def __init__(self, data):
        self.data = data
        self.event = None
        self.id = None
        self.desc_map = {
            self.data: "data",
            self.event: "event",
            self.id: "id"
        }

    def encode(self):
        if not self.data:
            return ""
        lines = ["%s: %s" % (v, k)
                 for k, v in self.desc_map.iteritems() if k]
        return "%s\n\n" % "\n".join(lines)


@app.route('/api/events')
def subscribe_events():
    def gen():
        q = Queue()
        print "New subscription!"
        subscriptions.append(q)
        print len(subscriptions)
        print id(subscriptions)
        try:
            while True:
                print "Waiting for data"
                result = q.get()
                print "Got data: " + result
                ev = ServerSentEvent(unicode(result))
                yield ev.encode()
        except GeneratorExit:
            print "Removing subscription"
            subscriptions.remove(q)
    return Response(gen(), mimetype="text/event-stream")


@app.route('/api/test')
def push_event():
    print len(subscriptions)
    print id(subscriptions)
    for sub in subscriptions:
        sub.put("test")
    return "OK"


class GunicornApplication(BaseApplication):
    def __init__(self, wsgi_app, port=5000):
        self.options = {
            'bind': "0.0.0.0:{port}".format(port=port),
            'workers': multiprocessing.cpu_count() + 1,
            'worker_class': 'gevent',
            'preload_app': True,
        }
        self.application = wsgi_app
        super(GunicornApplication, self).__init__()

    def load_config(self):
        config = dict([(key, value) for key, value in self.options.iteritems()
                       if key in self.cfg.settings and value is not None])
        for key, value in config.iteritems():
            self.cfg.set(key.lower(), value)

    def load(self):
        return self.application


if __name__ == '__main__':
    gapp = GunicornApplication(app)
    gapp.run()

The problem is that the subscriber's list seems to be different for every worker. This means that if worker #1 handles the /api/events endpoint and adds a new subscriber to the list, the client will only receive events that are added when worker #1 also handles the /api/test endpoint.

Curiously enough, the actual list object seems to be the same for each worker, since id(subscriptions) returns the same value in every worker.

Is there a way around this? I know that I could just use Redis, but the application is supposed to be as self-contained as possible, so I'm trying to avoid any external services.

Update: The cause of the problem seems to be in my embedding of the gunicorn.app.base.BaseApplication (which is a new feature in v0.19). When running the application from the command-line with gunicorn -k gevent minimal:app, everything works as expected

Update 2: The previous suspicion turned out to be wrong, the only reason it worked was because gunicorn's default number of worker processes is 1, when adjusting the number to fit the code via the -w parameter, it exhibits the same behavior.

Upvotes: 4

Views: 2052

Answers (1)

Wayne
Wayne

Reputation: 26

You say:

the actual list object seems to be the same for each worker, since id(subscriptions) returns the same value in every worker.

but I think it's not true, the subscriptions on each worker is not the same object. Each worker is a individual process, has its own memory space.

For self-contained system, you could develop a tiny system functioned like simple version of Redis. For example, using SQLite or ZeroMQ to communicate between these workers.

Upvotes: 1

Related Questions