Reputation: 2720
I am building a server which stores key/value data on top of Redis using Twisted Python. The server receives a JSON dictionary via HTTP, which is converted into a Python dictionary and put in a buffer. Everytime new data is stored, the server schedules a task which pops one dictionary from the buffer and writes every tuple into a Redis instance, using a txredis client.
class Datastore(Resource):
isLeaf = True
def __init__(self):
self.clientCreator = protocol.ClientCreator(reactor, Redis)
d = self.clientCreator.connectTCP(...)
d.addCallback(self.setRedis)
self.redis = None
self.buffer = deque()
def render_POST(self, request):
try:
task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
except IndexError:
request.setResponseCode(503)
return '<html><body>Error reading task_id</body></html>'
data = json.loads(request.content.read())
self.buffer.append((task_id, data))
reactor.callLater(0, self.write_on_redis)
return ' '
@defer.inlineCallbacks
def write_on_redis(self):
try:
task_id, dic = self.buffer.pop()
log.msg('Buffer: %s' % len(self.buffer))
except IndexError:
log.msg('buffer empty')
defer.returnValue(1)
m = yield self.redis.sismember('DONE', task_id)
# Simple check
if m == '1':
log.msg('%s already stored' % task_id)
else:
log.msg('%s unpacking' % task_id)
s = yield self.redis.sadd('DONE', task_id)
d = defer.Deferred()
for k, v in dic.iteritems():
k = k.encode()
d.addCallback(self.redis.push, k, v)
d.callback(None)
Basically, I am facing a Producer/Consumer problem between two different connections, but I am not sure that the current implementation works well in the Twisted paradygm. I have read the small documentation about producer/consumer interfaces in Twisted, but I am not sure if I can use them in my case. Any critics is welcome: I am trying to get a grasp of event-driven programming, after too many years of thread concurrency.
Upvotes: 3
Views: 2231
Reputation: 48335
The producer and consumer APIs in Twisted, IProducer
and IConsumer
, are about flow control. You don't seem to have any flow control here, you're just relaying messages from one protocol to another.
Since there's no flow control, the buffer is just extra complexity. You could get rid of it by just passing the data directly to the write_on_redis
method. This way write_on_redis
doesn't need to handle the empty buffer case, you don't need the extra attribute on the resource, and you can even get rid of the callLater
(although you can also do this even if you keep the buffer).
I don't know if any of this answers your question, though. As far as whether this approach "works well", here are the things I notice just by reading the code:
sismember
call or the sadd
call, you may lose tasks if either of these fail, since you've already popped them from the work buffer.Deferred
d
also means that any failed push will prevent the rest of the data from being pushed. It also passes the result of the Deferred
returned by push
(I'm assuming it returns a Deferred
) as the first argument to the next call, so unless push
more or less ignores its first argument, you won't be pushing the right data to redis.If you want to implement flow control, then you need to have your HTTP server check the length of self.buffer
and possibly reject the new task - not adding it to self.buffer
and returning some error code to the client. You still won't be using IConsumer
and IProducer
, but it's sort of similar.
Upvotes: 3