Reputation: 5928
I have the following code that launches multiple python processes that continually poll from an SQS queue.
The processes are launched with
num_processes = range(1, 9)
for p_num in num_processes:
p = multiprocessing.Process(
target=sqs_polling, args=(queue_name, p_num,))
p.start()
and the actual polling function is
def sqs_polling(queue_name, process_id):
sqs = boto3.resource('sqs', region_name='us-east-1')
queue = sqs.get_queue_by_name(QueueName=queue_name)
no_messages = False
# poll sqs forever
while 1:
# polling delay so aws does not throttle us
sleep(2.0)
# sleep longer if there are no messages on the queue the last time it was polled
if no_messages:
sleep(900.0)
message_batch = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
if len(message_batch) == 0:
no_messages = True
else:
no_messages = False
# process messages
for message in message_batch:
do_something(message)
message.delete()
This seems to work for a few hours but eventually it seems as though SQS throttles the processes and no messages can be read even though they exist on the queue. To help reduce this I have a timeout of 2 seconds between Queue reads. Also I have created a 15min timeout if there are no messages read. In spite of this I still get throttling. Can anyone explain why throttling is still occurring here? Another possibility might be that the connection to the queue gets stale but I think that is unlikely.
Upvotes: 1
Views: 4501
Reputation: 201
The question is a bit outdated, but I just released multi_sqs_listener that provides a high level, multi-threaded way to listen to multiple SQS queues from Python code.
import time
from multi_sqs_listener import QueueConfig, EventBus, MultiSQSListener
class MyListener(MultiSQSListener):
def low_priority_job(self, message):
print('Starting low priority, long job: {}'.format(message))
time.sleep(5)
print('Ended low priority job: {}'.format(message))
def high_priority_job(self, message):
print('Starting high priority, quick job: {}'.format(message))
time.sleep(.2)
print('Ended high priority job: {}'.format(message))
def handle_message(self, queue, bus, priority, message):
if bus == 'high-priority-bus':
self.high_priority_job(message.body)
else:
self.low_priority_job(message.body)
low_priority_bus = EventBus('low-priority-bus', priority=1)
high_priority_bus = EventBus('high-priority-bus', priority=5)
EventBus.register_buses([low_priority_bus, high_priority_bus])
low_priority_queue = QueueConfig('low-priority-queue', low_priority_bus)
high_priority_queue = QueueConfig('high-priority-queue', high_priority_bus)
my_listener = MyListener([low_priority_queue, high_priority_queue])
my_listener.listen()
Upvotes: 1