Jim M.
Jim M.

Reputation: 1009

DefaultMessageListenerContainer stops processing messages

I'm hoping this is a simple configuration issue but I can't seem to figure out what it might be.

Set-up

Problem

My application starts up fine and begins to process messages from Amazon SQS. After some amount of time I see the following warning

2020-02-01 04:16:21.482 LogLevel=WARN 1 --- [ecutor-thread14] o.s.j.l.DefaultMessageListenerContainer : Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers.

The above warning gets printed multiple times and eventually I see the following two INFO messages

2020-02-01 04:17:51.552 LogLevel=INFO 1 --- [ecutor-thread40] c.a.s.javamessaging.SQSMessageConsumer : Shutting down ConsumerPrefetch executor

2020-02-01 04:18:06.640 LogLevel=INFO 1 --- [ecutor-thread40] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor

The above 2 messages will display several times and at some point no more messages are consumed from SQS. I don't see any other messages in my log to indicate an issue, but I get no messages from my handlers that they are processing messages (I have 2~) and I can see the AWS SQS queue growing in the number of messages and the age.

~: This exact code was working fine when I had a single handler, this problem started when I added the second one.

Configuration/Code

The first "WARNing" I realize is caused by the currency of the ThreadPoolTaskExecutor, but I can not get a configuration which works properly. Here is my current configuration for the JMS stuff, I have tried various levels of max pool size with no real affect other than the warings start sooner or later based on the pool size

    public ThreadPoolTaskExecutor asyncAppConsumerTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setThreadGroupName("asyncConsumerTaskExecutor");
        taskExecutor.setThreadNamePrefix("asyncConsumerTaskExecutor-thread");
        taskExecutor.setCorePoolSize(10);
        // Allow the thread pool to grow up to 4 times the core size, evidently not
        // having the pool be larger than the max concurrency causes the JMS queue
        // to barf on itself with messages like
        // "Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers"
        taskExecutor.setMaxPoolSize(10 * 4);
        taskExecutor.setQueueCapacity(0); // do not queue up messages
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        return taskExecutor;
    }

Here is the JMS Container Factory we create

    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SQSConnectionFactory sqsConnectionFactory, ThreadPoolTaskExecutor asyncConsumerTaskExecutor) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(sqsConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        // The JMS processor will start 'concurrency' number of tasks
        // and supposedly will increase this to the max of '10 * 3'
        factory.setConcurrency(10 + "-" + (10 * 3));
        factory.setTaskExecutor(asyncConsumerTaskExecutor);
        // Let the task process 100 messages, default appears to be 10
        factory.setMaxMessagesPerTask(100);
        // Wait up to 5 seconds for a timeout, this keeps the task around a bit longer
        factory.setReceiveTimeout(5000L);
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }

I added the setMaxMessagesPerTask & setReceiveTimeout calls based on stuff found on the internet, the problem persists without these and at various settings (50, 2500L, 25, 1000L, etc...)

We create a default SQS connection factory

    public SQSConnectionFactory sqsConnectionFactory(AmazonSQS amazonSQS) {
        return new SQSConnectionFactory(new ProviderConfiguration(), amazonSQS);
    }

Finally the handlers look like this

    @JmsListener(destination = "consumer-event-queue")
    public void receiveEvents(String message) throws IOException {
        MyEventDTO myEventDTO = jsonObj.readValue(message, MyEventDTO.class);
        //messageTask.process(myEventDTO);
    }

    @JmsListener(destination = "myalert-sqs")
    public void receiveAlerts(String message) throws IOException, InterruptedException {
        final MyAlertDTO myAlert = jsonObj.readValue(message, MyAlertDTO.class);
        myProcessor.addAlertToQueue(myAlert);
    }

You can see in the first function (receiveEvents) we just take the message from the queue and exit, we have not implemented the processing code for that. The second function (receiveAlerts) gets the message, the myProcessor.addAlertToQueue function creates a runnable object and submits it to a threadpool to be processed at some point in the future.

The problem only started (the warning, info and failure to consume messages) only started when we added the receiveAlerts function, previously the other function was the only one present and we did not see this behavior.

More

This is part of a larger project and I am working on breaking this code out into a smaller test case to see if I can duplicate this issue. I will post a follow-up with the results.

In the Mean Time

I'm hoping this is just a config issue and someone more familiar with this can tell me what I'm doing wrong, or that someone can provide some thoughts and comments on how to correct this to work properly.

Thank you!

Upvotes: 2

Views: 2433

Answers (1)

Jim M.
Jim M.

Reputation: 1009

After fighting this one for a bit I think I finally resolved it.

The issue appears to be due to the "DefaultJmsListenerContainerFactory", this factory creates a new "DefaultJmsListenerContainer" for EACH method with a '@JmsListener' annotation. The person who originally wrote the code thought it was only called once for the application, and the created container would be re-used. So the issue was two-fold

  1. The 'ThreadPoolTaskExecutor' attached to the factory had 40 threads, when the application had 1 '@JmsListener' method this worked fine, but when we aded a second method then each method got 10 threads (total of 20) for listening. This is fine, however; since we stated that each listener could grow up to 30 listeners we quickly ran out of threads in the pool mentioned in 1 above. This caused the "Number of scheduled consumers has dropped below concurrentConsumers limit" error
  2. This is probably obvious given the above, but I wanted to call it out explicitly. In the Listener Factory we set the concurrency to be "10-30", however; all of the listeners have to share that pool. As such the max concurrency has to be setup so that each listeners' max value is small enough so that if each listener creates its maximum that it doesn't exceed the maximum number of threads in the pool (e.g. if we have 2 '@JmsListener' annotated methods and a pool with 40 threads, then the max value can be no more than 20).

Hopefully this might help someone else with a similar issue in the future....

Upvotes: 5

Related Questions