Reputation: 26426
(Python 3)
I am using a Python generator to read messages from a queue.
After the consumer reads a queue message, it needs to be able to tell the generator to delete the queue message if it was successfully processed.
In order to .send() to a Python generator, it seems I must first .send(None) to the generator. This is making my code fatter than I think it should be.
Can anyone suggest a way for qconsumer.py to drive the generator with fewer lines of code? I have identified which lines below I am hoping to eliminate.
In short, how can I make the code below more compact, any suggestions for how I can delete lines?
Code below is qconsumer.py:
from qserver import Qserver
myqserver = Qserver()
myproducer = myqserver.producer() # trying to eliminate this line
# first send to a generator must be None
myproducer.send(None) # trying to eliminate this line
for msg in myproducer:
# do something with message
print(msg)
if messageprocessok:
myproducer.send('delete')
Code below is qserver.py:
# -*- coding: utf-8 -*-
import boto
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
QNAME = 'qinbound'
SQSREGION = 'us-west-1'
class Qserver():
"""A simple Q server."""
def __init__(self, qname=None, sqsregion=None):
self.qname = qname or QNAME
self.sqsregion = sqsregion or SQSREGION
self.sqsconn = boto.sqs.connect_to_region(self.sqsregion)
self.q_in = self.sqsconn.get_queue(self.qname)
def producer(self):
while True:
qmessage = self.q_in.read(wait_time_seconds=20)
if qmessage is None:
continue
action = (yield qmessage.get_body())
if action == 'delete':
# if processing completed ok, clear message from this queue
self.q_in.delete_message(qmessage)
Upvotes: 4
Views: 268
Reputation: 25207
Your current consumer is throwing away messages because each send
call returns one. You should do this instead:
myqserver = Qserver()
myproducer = myqserver.producer()
messageprocessok = False
while True:
msg = myproducer.send('delete' if messageprocessok else None)
# do something with message
print(msg)
or alternatively:
myqserver = Qserver()
myproducer = myqserver.producer()
msg = next(myproducer)
while True:
# do something with message
print(msg)
msg = myproducer.send('delete' if messageprocessok else None)
The fact that you need separate calls to Qserver()
and myqserver.producer()
is simply because you made prouducer
a method of a class. Alternatively you could use a stand-alone function, or make a wrapper function that simply returns Qserver().producer()
. Here's the stand-alone version:
def producer(qname=None, sqsregion=None):
qname = qname or QNAME
sqsregion = sqsregion or SQSREGION
sqsconn = boto.sqs.connect_to_region(sqsregion)
q_in = sqsconn.get_queue(qname)
while True:
qmessage = q_in.read(wait_time_seconds=20)
if qmessage is None:
continue
action = (yield qmessage.get_body())
if action == 'delete':
# if processing completed ok, clear message from this queue
q_in.delete_message(qmessage)
Upvotes: 2
Reputation: 2175
Having understood what you're trying to do, I think I would avoid mixing send
with iteration. Having the myqserver
class be an iterator itself seems to make more sense to me:
# -*- coding: utf-8 -*-
import boto
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
QNAME = 'qinbound'
SQSREGION = 'us-west-1'
class Qserver():
"""A simple Q server."""
_current_message = None
def __init__(self, qname=None, sqsregion=None):
self.qname = qname or QNAME
self.sqsregion = sqsregion or SQSREGION
self.sqsconn = boto.sqs.connect_to_region(self.sqsregion)
self.q_in = self.sqsconn.get_queue(self.qname)
def __iter__(self):
return self
def __next__(self):
while True:
qmessage = self.q_in.read(wait_time_seconds=20)
if qmessage is not None:
self._current_message = qmessage
return qmessage
next = __next__
def delete_current(self):
if self._current_message is not None:
self.q_in.delete_message(self._current_message)
And usage will be something like:
from qserver import Qserver
myqserver = Qserver()
for msg in myqserver:
# do something with message
print(msg)
if messageprocessok:
myqserver.delete_current()
Upvotes: 1