Reputation: 541
I'm consuming messages from a rabbitmq with spring amqp. I'm consuming one message at a time and its pretty slow cause I save it to DB. So opening and closing transactions every time.
Right now I've set up a consumer like this.
@RabbitListener(queues = "queuename")
public void receive(Message message) {
someservice.saveToDb(message);
}
But this is really slow. I would like to consume a bunch of messages before I start saving them. Then I can open a transaction. Save 300 and then commit and load the next batch.
Would something like this work?
class MessageChannelTag {
Message message;
Channel channel;
long tag;
}
@Component
class ConsumerClass {
List<MessageChannelTag> messagesToSave = new ArrayList<>();
@RabbitListener(queues = "queuename")
public void receive(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
messagesToSave.add(new MessageChannelTag(message, channel, tag));
}
@Scheduled(fixedDelay=500)
public void saveMessagesToDb() {
List saveTheese = new ArrayList(messagesToSave);
messagesToSave.clear();
service.saveMessages(saveTheese);
for(MessageChannelTag messageChannelTag:messagesToSave) {
//In the service I could mark the rows if save succeded or not and
//then out here I could ack or nack..
messageChannelTag.getChannel().basicAck(messageChannelTag.getTag(), false);
}
}
}
Or if there is a simpler solution let me know. I prefer fast, simple and robust =)
Upvotes: 4
Views: 2104
Reputation: 9667
Don't use the "pull" API (basic.get
), it is not nearly as efficient as consuming messages.
Set prefetch (also known as QoS) to 300, then acknowledge the messages at once when you are done. I am not familiar with Spring but I'm certain there are decorators or other ways to accomplish this.
This is all covered in the docs and tutorials - https://www.rabbitmq.com/consumer-prefetch.html
Upvotes: 0
Reputation: 2065
It might also be worth investigating if an "upstream" producer can provide batches of messages instead of individual ones.
Upvotes: 1