ziu
ziu

Reputation: 2720

Yet another producer/consumer problem in Twisted Python

I am building a server which stores key/value data on top of Redis using Twisted Python. The server receives a JSON dictionary via HTTP, which is converted into a Python dictionary and put in a buffer. Everytime new data is stored, the server schedules a task which pops one dictionary from the buffer and writes every tuple into a Redis instance, using a txredis client.

class Datastore(Resource):

isLeaf = True

def __init__(self):
    self.clientCreator = protocol.ClientCreator(reactor, Redis)
    d = self.clientCreator.connectTCP(...)
    d.addCallback(self.setRedis)
    self.redis = None
    self.buffer = deque()


def render_POST(self, request):
    try:
        task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
    except IndexError:
        request.setResponseCode(503)
        return '<html><body>Error reading task_id</body></html>'  

    data = json.loads(request.content.read())
    self.buffer.append((task_id, data))
    reactor.callLater(0, self.write_on_redis)
    return ' '

@defer.inlineCallbacks 
def write_on_redis(self):
    try:
        task_id, dic = self.buffer.pop()
        log.msg('Buffer: %s' % len(self.buffer))
    except IndexError:
        log.msg('buffer empty')
        defer.returnValue(1)

    m = yield self.redis.sismember('DONE', task_id)
    # Simple check
    if m == '1':
        log.msg('%s already stored' % task_id)
    else:
        log.msg('%s unpacking' % task_id)
        s = yield self.redis.sadd('DONE', task_id)

        d = defer.Deferred()
        for k, v in dic.iteritems():
            k = k.encode()
            d.addCallback(self.redis.push, k, v)

        d.callback(None)

Basically, I am facing a Producer/Consumer problem between two different connections, but I am not sure that the current implementation works well in the Twisted paradygm. I have read the small documentation about producer/consumer interfaces in Twisted, but I am not sure if I can use them in my case. Any critics is welcome: I am trying to get a grasp of event-driven programming, after too many years of thread concurrency.

Upvotes: 3

Views: 2231

Answers (1)

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48335

The producer and consumer APIs in Twisted, IProducer and IConsumer, are about flow control. You don't seem to have any flow control here, you're just relaying messages from one protocol to another.

Since there's no flow control, the buffer is just extra complexity. You could get rid of it by just passing the data directly to the write_on_redis method. This way write_on_redis doesn't need to handle the empty buffer case, you don't need the extra attribute on the resource, and you can even get rid of the callLater (although you can also do this even if you keep the buffer).

I don't know if any of this answers your question, though. As far as whether this approach "works well", here are the things I notice just by reading the code:

  • If data arrives faster than redis accepts it, your list of outstanding jobs may become arbitrarily large, causing you to run out of memory. This is what flow control would help with.
  • With no error handling around the sismember call or the sadd call, you may lose tasks if either of these fail, since you've already popped them from the work buffer.
  • Doing a push as a callback on that Deferred d also means that any failed push will prevent the rest of the data from being pushed. It also passes the result of the Deferred returned by push (I'm assuming it returns a Deferred) as the first argument to the next call, so unless push more or less ignores its first argument, you won't be pushing the right data to redis.

If you want to implement flow control, then you need to have your HTTP server check the length of self.buffer and possibly reject the new task - not adding it to self.buffer and returning some error code to the client. You still won't be using IConsumer and IProducer, but it's sort of similar.

Upvotes: 3

Related Questions