Sathish
Sathish

Reputation: 2190

How to throttle my cron worker form pushing messages to RabbitMQ?

Context:

We have micro service which consumes(subscribes)messages from 50+ RabbitMQ queues.

Producing message for this queue happens in two places

  1. The application process when encounter short delayed execution business logic ( like send emails OR notify another service), the application directly sends the message to exchange ( which in turn it is sent to the queue ).

  2. When we encounter long/delayed execution business logic We have messages table which has entries of messages which has to be executed after some time.

Now we have cron worker which runs every 10 mins which scans the messages table and pushes the messages to RabbitMQ.

Scenario:

Let's say the messages table has 10,000 messages which will be queued in next cron run,

  1. 9.00 AM - Cron worker runs and it queues 10,000 messages to RabbitMQ queue.
  2. We do have subscribers which are listening to the queue and start consuming the messages, but due to some issue in the system or 3rd party response time delay it takes each message to complete 1 Min.
  3. 9.10 AM - Now cron worker once again runs next 10 Mins and see there are yet 9000+ messages yet to get completed and time is also crossed so once again it pushes 9000+ duplicates messages to Queue.

Note: The subscribers which consumes the messages are idempotent, so there is no issue in duplicate processing

Design Idea I had in my mind but not best logic

I can have 4 status ( RequiresQueuing, Queued, Completed, Failed )

  1. Whenever a message is inserted i can set the status to RequiresQueuing
  2. Next when cron worker picks and pushes the messages successfully to Queue i can set it to Queued
  3. When subscribers completes it mark the queue status as Completed / Failed.

There is an issue with above logic, let's say RabbitMQ somehow goes down OR in some use we have purge the queue for maintenance.

Now the messages which are marked as Queued is in wrong state, because they have to be once again identified and status needs to be changed manually.

Another Example

Let say I have RabbitMQ Queue named ( events )

This events queue has 5 subscribers, each subscribers gets 1 message from the queue and post this event using REST API to another micro service ( event-aggregator ). Each API Call usually takes 50ms.

Use Case:

  1. Due to high load the numbers events produced becomes 3x.
  2. Also the micro service ( event-aggregator ) which accepts the event also became slow in processing, the response time increased from 50ms to 1 Min.
  3. Cron workers follows your design mentioned above and queues the message for each min. Now the queue is becoming too large, but i cannot also increase the number of subscribers because the dependent micro service ( event-aggregator ) is also lagging.

Now the question is, If keep sending the messages to events queue, it is just bloating the queue.

https://www.rabbitmq.com/memory.html - While reading this page, i found out that rabbitmq won't even accept the connection if it reaches high watermark fraction (default is 40%). Of course this can be changed, but this requires manual intervention.

So if the queue length increases it affects the rabbitmq memory, that is reason i thought of throttling at producer level.

Questions

  1. How can i throttle my cron worker to skip that particular run or somehow inspect the queue and identify it already being heavily loaded so don't push the messages ?
  2. How can i handle the use cases i said above ? Is there design which solves my problem ? Is anyone faced the same issue ?

Thanks in advance.

Answer

Check the accepted answer Comments for the throttling using queueCount

Upvotes: 3

Views: 924

Answers (1)

Gowtham
Gowtham

Reputation: 91

You can combine QoS - (Quality of service) and Manual ACK to get around this problem. Your exact scenario is documented in https://www.rabbitmq.com/tutorials/tutorial-two-python.html. This example is for python, you can refer other examples as well.

Let says you have 1 publisher and 5 worker scripts. Lets say these read from the same queue. Each worker script takes 1 min to process a message. You can set QoS at channel level. If you set it to 1, then in this case each worker script will be allocated only 1 message. So we are processing 5 messages at a time. No new messages will be delivered until one of the 5 worker scripts does a MANUAL ACK.

If you want to increase the throughput of message processing, you can increase the worker nodes count.

The idea of updating the tables based on message status is not a good option, DB polling is the main reason that system uses queues and it would cause a scaling issue. At one point you have to update the tables and you would bottleneck because of locking and isolations levels.

Upvotes: 3

Related Questions