Reputation: 1446
We are using ActiveMQ as our message broker.
For one of the queues we found that rate at which message is produced is far higher than the rate at which messages are consumed. This sometimes results in ActiveMQ crashing.
So we are exploring the options to increase the consumption rate. (First priority is increasing rate of existing consumer and then increase number of consumer pods/instances).
Following is the current listener config
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
The listener logic interacts with a database multiple times and can be qualified as I/O bound task. So we are thinking of processing multiple messages in parallel.
We found following options.
We have set concurreny to 1-1
implying that
concurrentConsumers
and maxConcurrentConsumers
are 1 and only 1 message is processed at a time. This is the configuration we can increase to say 5-10
so that min 5 and max 10 consumers will be able to operate concurrently.
We also found that we can also set TaskExecutor
in listener factory. Like setting a threadPoolExecutor (corePoolSize = 5, maxPoolSize = 10, queueCapacity=50)
will also help us to concurrently process messages.
I'm unsure as which option to employ (1 or 2)
Upvotes: 4
Views: 2005
Reputation: 1045
I would agree with the previous answers.
When you have poor message consumption performance with the Spring DefaultMessageListenerContainer (DMLC), this almost always means that the JMS consumer/session/connection is being recreated for every message read.
In the non-transacted case, you can have the DMLC cache the consumer. The DMLC will use a single JMS connection, and start multiple JMS sessions on the connection. Each session will have a JMS message consumer.
In the transacted case, you will have no caching in the DMLC, and have to use a caching connection factory in order to avoid the performance problem. Look at the org.apache.activemq.pool.PooledConnectionFactory
or the org.messaginghub.pooled.jms.JmsPoolConnectionFactory
for this functionality.
As for your questions, I also would not bother with the TaskExecutor
. I would recommend a concurrency of 5-5
to start with. I have found that having the min and max number of consumers be the same (just having a nailed up pool) has advantages in stability and performance.
As to ActiveMQ crashing, this is due to the ActiveMQ prefetch. By default, ActiveMQ has a prefetch value of 1000. The JMS message consumer asks for a message, and the broker delivers the message and 999 others in the prefetch. If the consumer/session is then closed, the 999 messages are discarded on the client side, and then re-queued on the broker. This is very abusive for the broker and is not handled all that well.
Also, be aware that if you have 5 concurrent consumers, for example, then the first consumer will get 1000 messages, then the next will get 1000 messages and so on. So if you only have 500 messages in the queue, then only one consumer will be active. You would need to have 5000 messages in the queue to activate all 5 consumers.
When in doubt, disable the message prefetch in the client configuration:
tcp://broker_uri:61616?jms.prefetchPolicy.all=0
Upvotes: 5
Reputation: 4316
What is your observed throughput in msg/s and MB/s?
In my experience, you'll observe the fastest throughput will be observed with batching transactions over a PooledConnectionFactory using straight JMS API. Spring JMS Template can be difficult to configure and tune, esp w/ multi-threading and transactions. It also can close objects like the Consumer and Session on per-message which eliminates the benefit of the broker-side cache and prefetch.
When you go straight JMS-API and batched transactions (JMS, not necassarily XA) you'll get the benefit of the ActiveMQ broker's server-side cache and prefetch kicking in-- which makes a HUGE difference.
Upvotes: 1
Reputation: 35038
The JavaDoc for DefaultMessageListenerContainer.setTaskExecutor()
says this:
Set the Spring
TaskExecutor
to use for running the listener threads.Default is a
SimpleAsyncTaskExecutor
, starting up a number of new threads, according to the specified number of concurrent consumers.Specify an alternative
TaskExecutor
for integration with an existing thread pool. Note that this really only adds value if the threads are managed in a specific fashion, for example within a Java EE environment. A plain thread pool does not add much value, as this listener container will occupy a number of threads for its entire lifetime.
So, the main use-case for setting the task executor is for integration with an existing thread pool. The default executor will already scale according to the number of concurrent consumers you configure. Therefore, I would not recommend you set the task executor.
I would recommend simply setting the concurrency.
You'll need to determine your optimal values through careful benchmarking and analysis. Before you start that process I would strongly encourage you to set a specific performance goal because without a goal benchmarking and optimization can become endless tasks.
You might also consider using flow control for the producers so that the queues don't get so full in the first place.
Upvotes: 2