Ankit Gupta
Ankit Gupta

Reputation: 325

Making spring amqp consumer to stop consuming messages for a specified type of exception

I am using spring amqp using rabbitmq.

I have a use case where my consumer relies of another system - X which may have downtimes. What I would want is to handle the exception I get when X is down in the following way - On XDownException, I would like to stop processing the messages from the queue so that I don't lose these messages during the downtime and keep requeueing the message until I stop getting XDownException. This way, I am sure that I would not lose any messages when X is down and then resume automatically as soon as X is up.

Fifo is a must. The listener is thrown XDownException when processing the message. The listener is not transaction aware right now, but we can make it transaction aware if that helps. However I don't want to do this for every kind of exception... Is there a way I can do this with spring amqp?

Also, is there a better way to accomplish this than this approach? I don't have an event for when X comes up.

Upvotes: 0

Views: 835

Answers (1)

Avnish
Avnish

Reputation: 1321

FIFO requirement dictates that you must have no more than one concurrent consumer in your Container setup. Presuming this setup, you'll receive each message one by one in your POJO method. Next message wouldn't be delivered unless this message is fully processed.

Following is what the strategy should be to in this case for the use case you described.

  1. Make sure you are set prefetch size to 1 and manual acknowledge mode on your container. Full details available at Asynchronous Consumer
  2. Your setup should look like following
public class ExtendedListenerAdapter extends MessageListenerAdapter {
    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }
}


public class MyListener {
    public void handleMessage(Object object, Channel channel, Message message) throws IOException {
        try {
            processMessage(Object)
        } catch (XDownException xdex) {
            waitForXAvailability(object);
        } catch (OtherException oex) {
        } finally {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    
    private processMessage(Object object) /* throws All types of exceptions */ {
        // process the message as usual
    }
    
    private waitForXAvailability(Object object) {
        for (;;) {
            // add delay here, exponetial backoff delay recommend with upper bound
            // also add upper bounds on number of iteration you want to keep, it's infinte now
            try {
                processMessage(Object);
                return; // no exception means X is up again
            } catch (XDownException xdex) {
                // x is down, let the loop continue
            }
        }
    }
}

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setPrefetchCount(1)
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL)
        container.setMessageListener(myAdapter());
        return container;
    }

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public ExtendedListenerAdapter myAdapter() {
        ExtendedListenerAdapter adapter = new ExtendedListenerAdapter();
        listenerAdapter.setDelegate(myListener())
        return adapter;
    }
    
    @Bean
    public MyListener myListener() {
        return new MyListener();
    }
}

Please tune above outline to best suit your needs. Following links should give you additional helpful information

  1. Spring RabbitMQ - using manual channel acknowledgement
  2. Spring AMQP - Receiving Messages

Upvotes: 1

Related Questions