Timothy
Timothy

Reputation: 1035

Taking only specific message from @RabbitListener

I have legacy system that send message to RabbitMQ. The system use only one queue : q.finance.invoice but it has two types of message, where the type of message is available on header.

The first type

Type : invoice.created
{
  "field_1" : "",
  "field_2" : "",
}

The second type

Type : invoice.paid

{
  "field_5" : "",
  "field_6" : "",
}

So now my consumer need to handle the message selectively based on data type. Spring has @RabbitHandler that possible to do this... IF the message is published by spring. I cannot use a @RabbitHandler annotation though. I think it because the @RabbitHandler is converting message based on __TypeId__ header that does not exists from legacy system.

How can I simulate this @RabbitHandler behaviour (taking data based on it's type)?

So I use @RabbitListener to consume message. But @RabbitListener is taking all types of message. Another reason we use @RabbitListener is because our error handler depends on Message and Channel The basic method signatue we have is like this:

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
   // convert message body JSON string to object
   // process it
}

I'm trying to do manual reject based on type, which works. But I'm sure it is not scalable when I have many listeners or queues

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service
public class InvoiceListenerOnMethod {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice created : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice created : {}", message);
    }

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice paid : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice paid : {}", message);
    }

}

See, the point if when I have 4 messages (paid-paid-created-created), the listener can runs more than 4 times, because we cannot control who will take which message. So it can be like this for the listenInvoicePaid()

And the same way multiple rejects() before ack() can also be happen in listenInvoiceCreated()
So in total I can have like 10 messages call or so, before all the message properly processed.

Any suggestion to fix the code?

Upvotes: 2

Views: 10727

Answers (3)

Timothy
Timothy

Reputation: 1035

Possible Implementation

Here is the naive if-else way, thanks Mark. This is your suggestion (1st alternative). As for 2nd alternative, I can't do because publisher is legacy system which I don't have code

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            @Header("type") String type) throws IOException {
        if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
            log.info("Delegate to invoice paid handler");
        } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
            log.info("Delegate to invoice created handler");
        } else {
            log.info("Delegate to default handler");
        }
    }

2nd implementation alternative
Here is what I implement, thanks to Gary. I think this is cleaner approach. Next I only need to extract the message post processor to some other class for maintanability, so I won't clutter my @RabbitListener

Configuration File

import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;

@Configuration
public class RabbitmqConfig {

    @Bean(name = "rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        factory.setAfterReceivePostProcessors(new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                var type = message.getMessageProperties().getHeaders().get("type").toString();
                String typeId = null;

                if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
                    typeId = InvoicePaidMessage.class.getName();
                } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
                    typeId = InvoiceCreatedMessage.class.getName();
                }

                Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));

                return message;
            }

        });

        return factory;
    }

    @Bean
    Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

}

Listener

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;

@Service
@RabbitListener(queues = "q.finance.invoice")
public class InvoiceListener {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);

    @RabbitHandler
    public void listenInvoiceCreated(InvoiceCreatedMessage message) {
        log.info("Listening invoice created : {}", message);
    }

    @RabbitHandler
    public void listenInvoicePaid(InvoicePaidMessage message) {
        log.info("Listening invoice paid : {}", message);
    }

    @RabbitHandler(isDefault = true)
    public void listenDefault(Message message) {
        log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
    }

}

Upvotes: 2

Gary Russell
Gary Russell

Reputation: 174749

You can add a MessagePostProcessor to the container factory's afterReceiveMessagePostProcessor property. In the post processor, you can examine the JSON body() and set the __TypeId__ header to the appropriate class name.

See this answer for an example.

Upvotes: 4

Mark Bramnik
Mark Bramnik

Reputation: 42541

I haven't worked with spring integration of rabbit, but all-in-all an idea of having a single queue that handles different message types sound like something problematic:

Many consumers will potentially get messages of types that they can't handle and will have to reject them, so that the message will get back to rabbit, and then again and again... The performance of all the cluster can aggravate because of this.

So I think there are two paths you can follow:

  • Implement single listener that can handle two types of messages. No need to change Rabbit but can be a challenging refactoring on the java side.

  • Fortunately Rabbit MQ is very flexible when it comes to routing the messages. Configure exchange to route message of type A to queue A and message of type B to queue B based on routing key, header of whatever, there are different types of Exchanges in Rabbit and you'll find the best configuration that works for you for sure.

I personally would go with the second path.

Upvotes: 0

Related Questions