Hbargujar
Hbargujar

Reputation: 400

SpringBoot @SqsListener - not working - with Exception - TaskRejectedException

I have a AWS SQS with 5000 messages already on the Queue (Sample Message looks like this 'Hello @ 1') I created a SpringBoot Application and inside one of the Component Classes create a method to read messages from the SQS.

package com.example.aws.sqs.service;

import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class MessageReceiverService {   

@SqsListener(value = { "${cloud.aws.sqs.url}" }, deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void readMessage(String message){
    log.info("Reading Message... {}", message);
}

}

My main SpringBoot Class

@SpringBootApplication 
public class AwsSqsApplicationConsumer {
public static void main(String[] args) {
    SpringApplication.run(AwsSqsApplicationConsumer.class, args);
}
}

Exception I get when the application runs:

s.c.a.m.l.SimpleMessageListenerContainer : An Exception occurred while polling queue '<my sqs name>'. The failing operation will be retried in 10000 milliseconds
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:309) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2 rejected from java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_65]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:306) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
... 6 common frames omitted

I am NOT configuring any custom Executor services. Using the preconfigured Spring Beans. springBootVersion = '2.0.3.RELEASE' springCloudVersion = 'Finchley.RELEASE'

Upvotes: 16

Views: 24666

Answers (6)

Shubham Jaiswal
Shubham Jaiswal

Reputation: 41

Adding @EnableSqs annotation on the MessageRecieverService

Upvotes: 2

user2273176
user2273176

Reputation: 71

I believe it is a bug or oversight in Spring. The issue stems from the default values of:

public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {

    private static final int DEFAULT_WORKER_THREADS = 2;

and

abstract class AbstractMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
    private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;

If no maxNumberOfMessages is set, then it uses 10 as the number of messages to pull from SQS and 2 as the number of workers in the task executor. This means if it pulls 3 or more messages at once, you get that exception. If you manually set maxNumberOfMessages to a value (any value) it will use it both places synchronising the values as i believe is expected:

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            SimpleMessageListenerContainerFactory factory, QueueMessageHandler messageHandler)
    {
        SimpleMessageListenerContainer container = factory.createSimpleMessageListenerContainer();
        container.setMaxNumberOfMessages(5);
        container.setMessageHandler(messageHandler);
        return container;
    }

Upvotes: 5

Matt Garner
Matt Garner

Reputation: 165

Cannot add a comment to the previous answers to further explain why the issue is occurring and why the solution setting MaxNumberOfMessages of messages works. Hopefully, the following helps to clarify everything.

SimpleMessageListenerContainer's ThreadPoolTaskExecutor is configured to have a core pool size of 2 threads, a max pool size of 3 threads and a queue capacity of 0. However, the default max number of messages to return on a poll to Amazon SQS is set to 10. Meaning that should 10 messages be available in a single poll there won't be enough threads to process them. Thus the RejectedExecutionException is thrown.

Configuring the setMaxNumberOfMessages to 10 on SimpleMessageListenerContainerFactory sets the max thread pool size to 11, which should allow enough threads to be available. It doesn't set the queue capacity.

To set the queue capacity, a separate TaskExecutor can be initialised and set on the SimpleMessageListenerContainerFactory bean as follows:

@Bean(name = "sqsAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor(@Value("${external.aws.sqs.core-thread-count}") int coreThreadCount,
                                           @Value("${external.aws.sqs.max-thread-count}") int maxThreadCount,
                                           @Value("${external.aws.sqs.queue-capacity}") int queueCapacity) {
    ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
    asyncTaskExecutor.setCorePoolSize(coreThreadCount);
    asyncTaskExecutor.setMaxPoolSize(maxThreadCount);
    asyncTaskExecutor.setQueueCapacity(queueCapacity);
    asyncTaskExecutor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
    asyncTaskExecutor.initialize();
    return asyncTaskExecutor;
}

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS, @Qualifier("sqsAsyncTaskExecutor") AsyncTaskExecutor asyncTaskExecutor) {
    SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
    simpleMessageListenerContainerFactory.setTaskExecutor(asyncTaskExecutor);
    return simpleMessageListenerContainerFactory;
}

The values I use were coreThreadCount = 5, maxThreadCount = 20, queueCapacity = 10.

As I've already said, I think configuring the setMaxNumberOfMessages to 10 on SimpleMessageListenerContainerFactory should be enough to process all batched messages fetched from a single request. However, if you feel you need more precise control over the TaskExecutor then this configuration works as well.

Upvotes: 2

Shrikant Pandit
Shrikant Pandit

Reputation: 31

Hey I solved this problem using Spring Listener. Following is the code, hope it helps.

In following solution, once all the beans initialization is finished, than a new task executor with bigger pool size is allocated.

@Component
public class PostBeansConstructionListener{

    @EventListener
    public void handleContextRefreshedEvent(ContextRefreshedEvent event){
        final ApplicationContext applicationContext = event.getApplicationContext();
        final SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
        setAsyncTaskExecutor(simpleMessageListenerContainer);
    }

    private void setAsyncTaskExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) {
        try{
            simpleMessageListenerContainer.setTaskExecutor(getAsyncExecutor());
        }catch(Exception ex){
            throw new RuntimeException("Not able to create Async Task Executor for SimpleMessageListenerContainer.", ex);
        }
    }

    public AsyncTaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
        executor.initialize();
        return executor;
    }
}

Upvotes: 1

Ganesh Satpute
Ganesh Satpute

Reputation: 3961

Problem is with the listener thread configuration. See the following

...
ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]]
...

The default thread pool size is less than what you desire.

Add following configuration to your Spring Application

@Configuration
public class TasksConfiguration implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(5); // TODO: Load this from configuration
        taskScheduler.initialize();
        taskRegistrar.setTaskScheduler(taskScheduler);
    }
}

Now, you should be able to process these tasks.

P.S. Whatever tasks were rejected earlier they will be picked up later after the certain period.

Edit: I think people are getting scared by the number in line .setPoolSize(5000). It's a configurable number you can choose whatever number suitable for your requirements. For the answer, I'm reducing it to a smaller number.

Upvotes: 3

Zaid Direya
Zaid Direya

Reputation: 641

setting the max number of messages seems to solve the issue:

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQS);
    factory.setMaxNumberOfMessages(10);
    return factory;
}

Upvotes: 17

Related Questions