Adam Gent
Adam Gent

Reputation: 49075

RabbitMQ grouping messages as one message ie coalescing messages

I'm trying to understand the best way to coalesce or chunk incoming messages in RabbitMQ (using Spring AMQP or the Java client directly).

In other words I would like to take say 100 incoming messages and combine them as 1 and resend it to another queue in a reliable (correctly ACKed way). I believe this is called the aggregator pattern in EIP.

I know Spring Integration provides an aggregator solution but the implementation looks like its not fail safe (that is it looks like it has to ack and consume messages to build the coalesced message thus if you shutdown it down while its doing this you will loose messages?).

Upvotes: 2

Views: 4668

Answers (2)

kzhen
kzhen

Reputation: 3118

I can't comment directly on the Spring Integration library, so I'll speak generally in terms of RabbitMQ.

If you're not 100% convinced by the Spring Integration implementation of the Aggregator and are going to try to implement it yourself then I would recommend avoiding using tx which uses transactions under the hood in RabbitMQ.

Transactions in RabbitMQ are slow and you will definitely suffer performance problems if you're building a high traffic/throughput system.

Rather I would suggest you take a look at Publisher Confirms which is an extension to AMQP implemented in RabbitMQ. Here is an introduction to it when it was new http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/.

You will need to tweak the prefetch setting to get the performance right, take a look at http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/ for some details.

All the above gives you some background to help solve your problem. The implementation is rather straightforward.

When creating your consumer you will need to ensure you set it so that ACK is required.

  1. Dequeue n messages, as you dequeue you will need to make note of the DeliveryTag for each message (this is used to ACK the message)
  2. Aggregate the messages into a new message
  3. Publish the new message
  4. ACK each dequeued message

One thing to note is that if your consumer dies after 3 and before 4 has completed then those messages that weren't ACK'd will be reprocessed when it comes back to life

Upvotes: 3

Gary Russell
Gary Russell

Reputation: 174494

If you set the <amqp-inbound-channel-adapter/> tx-size attribute to 100, the container will ack every 100 messages so this should prevent message loss.

However, you might want to make the send of the aggregated message (on the 100th receive) transactional so you can confirm the broker has the message before the ack for the inbound messages.

Upvotes: 2

Related Questions