Reputation: 1035
I have legacy system that send message to RabbitMQ.
The system use only one queue : q.finance.invoice
but it has two types of message, where the type of message is available on header.
The first type
Type : invoice.created
{
"field_1" : "",
"field_2" : "",
}
The second type
Type : invoice.paid
{
"field_5" : "",
"field_6" : "",
}
So now my consumer need to handle the message selectively based on data type.
Spring has @RabbitHandler
that possible to do this... IF the message is published by spring.
I cannot use a @RabbitHandler
annotation though.
I think it because the @RabbitHandler
is converting message based on __TypeId__
header that does not exists from legacy system.
How can I simulate this @RabbitHandler
behaviour (taking data based on it's type)?
So I use @RabbitListener
to consume message.
But @RabbitListener
is taking all types of message.
Another reason we use @RabbitListener
is because our error handler depends on Message
and Channel
The basic method signatue we have is like this:
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
// convert message body JSON string to object
// process it
}
I'm trying to do manual reject based on type, which works. But I'm sure it is not scalable when I have many listeners or queues
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice created : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice created : {}", message);
}
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice paid : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice paid : {}", message);
}
}
See, the point if when I have 4 messages (paid-paid-created-created), the listener can runs more than 4 times, because we cannot control who will take which message. So it can be like this for the listenInvoicePaid()
And the same way multiple rejects() before ack() can also be happen in listenInvoiceCreated()
So in total I can have like 10 messages call or so, before all the message properly processed.
Any suggestion to fix the code?
Upvotes: 2
Views: 10727
Reputation: 1035
Possible Implementation
Here is the naive if-else way, thanks Mark. This is your suggestion (1st alternative). As for 2nd alternative, I can't do because publisher is legacy system which I don't have code
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header("type") String type) throws IOException {
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
log.info("Delegate to invoice paid handler");
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
log.info("Delegate to invoice created handler");
} else {
log.info("Delegate to default handler");
}
}
2nd implementation alternative
Here is what I implement, thanks to Gary. I think this is cleaner approach. Next I only need to extract the message post processor to some other class for maintanability, so I won't clutter my @RabbitListener
Configuration File
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Configuration
public class RabbitmqConfig {
@Bean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
var type = message.getMessageProperties().getHeaders().get("type").toString();
String typeId = null;
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
typeId = InvoicePaidMessage.class.getName();
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
typeId = InvoiceCreatedMessage.class.getName();
}
Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));
return message;
}
});
return factory;
}
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
Listener
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Service
@RabbitListener(queues = "q.finance.invoice")
public class InvoiceListener {
private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
@RabbitHandler
public void listenInvoiceCreated(InvoiceCreatedMessage message) {
log.info("Listening invoice created : {}", message);
}
@RabbitHandler
public void listenInvoicePaid(InvoicePaidMessage message) {
log.info("Listening invoice paid : {}", message);
}
@RabbitHandler(isDefault = true)
public void listenDefault(Message message) {
log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
}
}
Upvotes: 2
Reputation: 174749
You can add a MessagePostProcessor
to the container factory's afterReceiveMessagePostProcessor
property. In the post processor, you can examine the JSON body()
and set the __TypeId__
header to the appropriate class name.
See this answer for an example.
Upvotes: 4
Reputation: 42541
I haven't worked with spring integration of rabbit, but all-in-all an idea of having a single queue that handles different message types sound like something problematic:
Many consumers will potentially get messages of types that they can't handle and will have to reject them, so that the message will get back to rabbit, and then again and again... The performance of all the cluster can aggravate because of this.
So I think there are two paths you can follow:
Implement single listener that can handle two types of messages. No need to change Rabbit but can be a challenging refactoring on the java side.
Fortunately Rabbit MQ is very flexible when it comes to routing the messages. Configure exchange to route message of type A to queue A and message of type B to queue B based on routing key, header of whatever, there are different types of Exchanges in Rabbit and you'll find the best configuration that works for you for sure.
I personally would go with the second path.
Upvotes: 0