Reputation: 325
I am using spring amqp using rabbitmq.
I have a use case where my consumer relies of another system - X which may have downtimes. What I would want is to handle the exception I get when X is down in the following way - On XDownException, I would like to stop processing the messages from the queue so that I don't lose these messages during the downtime and keep requeueing the message until I stop getting XDownException. This way, I am sure that I would not lose any messages when X is down and then resume automatically as soon as X is up.
Fifo is a must. The listener is thrown XDownException when processing the message. The listener is not transaction aware right now, but we can make it transaction aware if that helps. However I don't want to do this for every kind of exception... Is there a way I can do this with spring amqp?
Also, is there a better way to accomplish this than this approach? I don't have an event for when X comes up.
Upvotes: 0
Views: 835
Reputation: 1321
FIFO requirement dictates that you must have no more than one concurrent consumer in your Container setup. Presuming this setup, you'll receive each message one by one in your POJO method. Next message wouldn't be delivered unless this message is fully processed.
Following is what the strategy should be to in this case for the use case you described.
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
public class MyListener {
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
try {
processMessage(Object)
} catch (XDownException xdex) {
waitForXAvailability(object);
} catch (OtherException oex) {
} finally {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
private processMessage(Object object) /* throws All types of exceptions */ {
// process the message as usual
}
private waitForXAvailability(Object object) {
for (;;) {
// add delay here, exponetial backoff delay recommend with upper bound
// also add upper bounds on number of iteration you want to keep, it's infinte now
try {
processMessage(Object);
return; // no exception means X is up again
} catch (XDownException xdex) {
// x is down, let the loop continue
}
}
}
}
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setPrefetchCount(1)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL)
container.setMessageListener(myAdapter());
return container;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public ExtendedListenerAdapter myAdapter() {
ExtendedListenerAdapter adapter = new ExtendedListenerAdapter();
listenerAdapter.setDelegate(myListener())
return adapter;
}
@Bean
public MyListener myListener() {
return new MyListener();
}
}
Please tune above outline to best suit your needs. Following links should give you additional helpful information
Upvotes: 1