Reputation: 400
I have a AWS SQS with 5000 messages already on the Queue (Sample Message looks like this 'Hello @ 1') I created a SpringBoot Application and inside one of the Component Classes create a method to read messages from the SQS.
package com.example.aws.sqs.service;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class MessageReceiverService {
@SqsListener(value = { "${cloud.aws.sqs.url}" }, deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void readMessage(String message){
log.info("Reading Message... {}", message);
}
}
My main SpringBoot Class
@SpringBootApplication
public class AwsSqsApplicationConsumer {
public static void main(String[] args) {
SpringApplication.run(AwsSqsApplicationConsumer.class, args);
}
}
Exception I get when the application runs:
s.c.a.m.l.SimpleMessageListenerContainer : An Exception occurred while polling queue '<my sqs name>'. The failing operation will be retried in 10000 milliseconds
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:309) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2 rejected from java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_65]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:306) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
... 6 common frames omitted
I am NOT configuring any custom Executor services. Using the preconfigured Spring Beans. springBootVersion = '2.0.3.RELEASE' springCloudVersion = 'Finchley.RELEASE'
Upvotes: 16
Views: 24666
Reputation: 71
I believe it is a bug or oversight in Spring. The issue stems from the default values of:
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
private static final int DEFAULT_WORKER_THREADS = 2;
and
abstract class AbstractMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;
If no maxNumberOfMessages is set, then it uses 10 as the number of messages to pull from SQS and 2 as the number of workers in the task executor. This means if it pulls 3 or more messages at once, you get that exception. If you manually set maxNumberOfMessages to a value (any value) it will use it both places synchronising the values as i believe is expected:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
SimpleMessageListenerContainerFactory factory, QueueMessageHandler messageHandler)
{
SimpleMessageListenerContainer container = factory.createSimpleMessageListenerContainer();
container.setMaxNumberOfMessages(5);
container.setMessageHandler(messageHandler);
return container;
}
Upvotes: 5
Reputation: 165
Cannot add a comment to the previous answers to further explain why the issue is occurring and why the solution setting MaxNumberOfMessages of messages works. Hopefully, the following helps to clarify everything.
SimpleMessageListenerContainer
's ThreadPoolTaskExecutor
is configured to have a core pool size of 2 threads, a max pool size of 3 threads and a queue capacity of 0. However, the default max number of messages to return on a poll to Amazon SQS is set to 10. Meaning that should 10 messages be available in a single poll there won't be enough threads to process them. Thus the RejectedExecutionException
is thrown.
Configuring the setMaxNumberOfMessages
to 10 on SimpleMessageListenerContainerFactory
sets the max thread pool size to 11, which should allow enough threads to be available. It doesn't set the queue capacity.
To set the queue capacity, a separate TaskExecutor can be initialised and set on the SimpleMessageListenerContainerFactory
bean as follows:
@Bean(name = "sqsAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor(@Value("${external.aws.sqs.core-thread-count}") int coreThreadCount,
@Value("${external.aws.sqs.max-thread-count}") int maxThreadCount,
@Value("${external.aws.sqs.queue-capacity}") int queueCapacity) {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setCorePoolSize(coreThreadCount);
asyncTaskExecutor.setMaxPoolSize(maxThreadCount);
asyncTaskExecutor.setQueueCapacity(queueCapacity);
asyncTaskExecutor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS, @Qualifier("sqsAsyncTaskExecutor") AsyncTaskExecutor asyncTaskExecutor) {
SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
simpleMessageListenerContainerFactory.setTaskExecutor(asyncTaskExecutor);
return simpleMessageListenerContainerFactory;
}
The values I use were coreThreadCount = 5, maxThreadCount = 20, queueCapacity = 10.
As I've already said, I think configuring the setMaxNumberOfMessages to 10 on SimpleMessageListenerContainerFactory should be enough to process all batched messages fetched from a single request. However, if you feel you need more precise control over the TaskExecutor then this configuration works as well.
Upvotes: 2
Reputation: 31
Hey I solved this problem using Spring Listener. Following is the code, hope it helps.
In following solution, once all the beans initialization is finished, than a new task executor with bigger pool size is allocated.
@Component
public class PostBeansConstructionListener{
@EventListener
public void handleContextRefreshedEvent(ContextRefreshedEvent event){
final ApplicationContext applicationContext = event.getApplicationContext();
final SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
setAsyncTaskExecutor(simpleMessageListenerContainer);
}
private void setAsyncTaskExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) {
try{
simpleMessageListenerContainer.setTaskExecutor(getAsyncExecutor());
}catch(Exception ex){
throw new RuntimeException("Not able to create Async Task Executor for SimpleMessageListenerContainer.", ex);
}
}
public AsyncTaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
executor.initialize();
return executor;
}
}
Upvotes: 1
Reputation: 3961
Problem is with the listener thread configuration. See the following
...
ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]]
...
The default thread pool size is less than what you desire.
Add following configuration to your Spring Application
@Configuration
public class TasksConfiguration implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(5); // TODO: Load this from configuration
taskScheduler.initialize();
taskRegistrar.setTaskScheduler(taskScheduler);
}
}
Now, you should be able to process these tasks.
P.S. Whatever tasks were rejected earlier they will be picked up later after the certain period.
Edit: I think people are getting scared by the number in line .setPoolSize(5000)
. It's a configurable number you can choose whatever number suitable for your requirements. For the answer, I'm reducing it to a smaller number.
Upvotes: 3
Reputation: 641
setting the max number of messages seems to solve the issue:
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQS);
factory.setMaxNumberOfMessages(10);
return factory;
}
Upvotes: 17