Reputation: 429
I have a simple producer/consumer amqp set up like this:
producer -> e1:jobs_queue -> consumer -> e2:results_queue -> result_handler
The producer sends of some number of jobs. The consumer pulls down jobs one at a time, and processes them, pushing the result into another queue. These are then pulled out by the result_handler that publishes the results to a database.
Sometimes the consumer fails - it may get killed by the OS or throw an exception. If this happens while it is processing a message then this message is lost, no corresponding result is produced and I am sad. I would be happy again if the failed job was re-queued.
What I'm looking for is a design pattern for ensuring that either consumer processes the job to completion and puts a corresponding result into *results_queue*, or if it fails that the job is put back into *jobs_queue*. As consumer is what is failing, consumer should not be responsible for managing any messages relating to its own supervision.
We know that consumer has failed to process a job if:
For my application, we can probably capture the 2nd case by simply waiting for processing of the job to time out. In production, there will be many workers to supervise, all pulling jobs from a common jobs list and putting results into a single results exchange/queue.
Upvotes: 4
Views: 1645
Reputation: 3142
The easiest way to achieve what you want is to manually handle acknowledgements for messages received. In node-amqp
it's as simple as adding the option { ack: true }
to the queue.subscribe
call. You can then acknowledge messages by calling some function on the queue. In case of node-amqp
it's queue.shift()
.
You can also set the number of not-yet-acknowledged messages the consumer is allowed, using prefetchCount
.
Any unacknowledged messages will now be redelivered (to any connected consumers) if the consumer disconnects.
By also setting the queue to durable
and autoDelete: false
, you can additionally make sure that the queue (and messages on it) will not be deleted upon restart of your MQ-server or disconnection of the last consumer.
Upvotes: 2