Yuri Makassiouk
Yuri Makassiouk

Reputation: 557

RabbitMQ-based messaging with long-running consumer process - I need a better pattern

I have a service that consumes messages from RabbitMQ queue. Its main characteristic is the long-running handler, the processing can take anything from 5 minutes to 6 hours (in fact, no real limitation, importantly, it is often longer than RabbitMQ's recommended and default ack timeout of 30 minutes). So, in order to avoid acknowledgement timeout, my current pattern is:

This is implemented in a .net application, with RabbitMQ.Client library (the official library).

A big downside to this pattern is that I have to acknowledge the message before the work is done. If something happens (service dies before finishing for whatever reason), there is no result and no more message. A naive solution would be to set a gigantic timeout, like 2 days, and implement ACKing after the work is finished. But then, again, if something happens before the acknowledgement, the message isn't requeued before the timeout expires, and 2 days is a long time to wait. So, ideally, we would want a short, but extendable timeout. So that we could

This way, if the service dies and doesn't "extend the lease", the message is requeued after the (short, like, 10 minutes) timeout has expired - and, for example, the next attempt is made. The one thing missing is such "please extend my lease" API. Or am I missing something? Please in any case share your thoughts on this.

Upvotes: 2

Views: 1128

Answers (2)

Yuri Makassiouk
Yuri Makassiouk

Reputation: 557

So, it looks like RabbitMQ does not have "leasing" of messages. I don't know if any other messaging bus system does, but it isn't a matter of discussion this time, let's say we can't switch the message bus at this time, anyway.

So, my plan to improve this situation is to create an additional service - like a leasing manager, with its own persistant storage. The handling will look like this, then:

  • Long-running service A receives a single message from RabbitMQ queue

  • It pauses receiving messages (until the work is done)

  • It packs together message's payload, some headers that we use in messages, requeue destination (will be the queue from which we just received in most cases) and lease timeout and sends this package to the lease manager service LM.

    • The lease timeout should be compatible with how often service A is able to emit a heartbeat message
  • Service A ACKs the message to RabbitMQ

  • Service A stars working on the task

  • Service A periodically emits the "still working" heartbeat message to LM.

  • When done with the work, Service A sends a message to LM with a command to finish the lease and dispose of the message

  • Service A "opens for business" again - starts listening to the task messages.

  • LM, upon receiving of the first registration of the message persists it into the storage and ACKs the message (this should be a quick operation, as all others that LM does)

  • LM, upon receiving the heartbeat message, looks up the persisted original message and extends its lease by the lease timeout value (updates the storage with the new timestamp (current time + lease timeout))

  • Upon receiving "work completed" message, looks up the original message and deletes it from storage

  • LM also has a scheduled task, in which it looks up all the original messages with expired lease, re-queues them to the provided queues and removes them from the storage. Actually, probably removes, makes sure removal worked and then re-queues, otherwise some malfunction here causes a small tragedy.

Not too complex, but still rather many moving parts, actually. This design is still work in progress. If I can simplify it, I would.

Upvotes: 1

Maxim Fateev
Maxim Fateev

Reputation: 6890

Consider using a separate subsystem to process the long-running task. For example, temporal.io supports tasks of unlimited duration with a heartbeat.

Upvotes: 0

Related Questions