Vijaykumar Arumugam
Vijaykumar Arumugam

Reputation: 165

message-driven-channel-adapter - concurrent processing of messages

Issue

We have a legacy application with the following message-driven-channel-adapter configuration

<jms:message-driven-channel-adapter 
    id="processRequest"        
    destination="requestFromLoader"
    connection-factory="connectionFactory"
    max-concurrent-consumers="30"
    message-converter="xmlMarshalConverter"
    channel="jmsInChannel"
    error-channel="errorChannel"/>

with this configuration max-concurrent-consumers="30" we expected 30 parallel cosumers (processRequest-container-1 to 30) to consume the message but during testing we found that each consumers in turn consumes 10 messages, Eg - processRequest-container-1 processes messages 1 to 10 and then creates processRequest-container-2 and process 11 to 20 and so on..

if there are only 2 request this causes 2nd message to wait even though there are 29 other consumers.

Resolution

We added max-messages-per-task="1", this seems to yield the correct result but we are still testing with large volumes of messages.

<jms:message-driven-channel-adapter 
    id="processRequest"        
    destination="requestFromLoader"
    connection-factory="connectionFactory"
    max-concurrent-consumers="30"
    max-messages-per-task="1"
    message-converter="xmlMarshalConverter"
    channel="jmsInChannel"
    error-channel="errorChannel"/>

Questions

  1. We notified that the name of the consumer in the logs (processRequest-container-1) is going upto 2000 or 3000 (processRequest-container-2546), and not restricted to processRequest-container-1 to processRequest-container-30, but in the first case without max-messages-per-task="1" it did not reach beyond processRequest-container-30, is it normal? or do we need to do something about it?

  2. In the spring documentation max-messages-per-task="1" is defined as task to be performed per message, what is a task and how the max parallel consumer is affected by this?

  3. is adding max-messages-per-task="1" correct or are we missing / overlooking something else?

any suggestion or comments are good. thanks in advance.

Upvotes: 0

Views: 96

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

This question is more about Spring JMS, than Spring Integration.

It is probably better to read source code and Javadocs of the DefaultMessageListenerContainer which is used for the mentioned <jms:message-driven-channel-adapter>.

There is an interesting comment which might explain your behavior:

/**
 * Specify the maximum number of concurrent consumers to create. Default is 1.
 * <p>If this setting is higher than "concurrentConsumers", the listener container
 * will dynamically schedule surplus consumers at runtime, provided that enough
 * incoming messages are encountered. Once the load goes down again, the number of
 * consumers will be reduced to the standard level ("concurrentConsumers") again.
 * <p>Raising the number of concurrent consumers is recommendable in order
 * to scale the consumption of messages coming in from a queue. However,
 * note that any ordering guarantees are lost once multiple consumers are
 * registered. In general, stick with 1 consumer for low-volume queues.
 * <p><b>Do not raise the number of concurrent consumers for a topic,
 * unless vendor-specific setup measures clearly allow for it.</b>
 * With regular setup, this would lead to concurrent consumption
 * of the same message, which is hardly ever desirable.
 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
 * @see #setConcurrentConsumers
 */
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {

So, since you don't use concurrent-consumers="30" instead, it is normal that you got up to 30 dynamic instances which indeed got their ids incremented.

If you don't specify max-messages-per-task="1", the logic is like this:

        if (this.taskExecutor instanceof SchedulingTaskExecutor ste && ste.prefersShortLivedTasks()) {
            if (this.maxMessagesPerTask == Integer.MIN_VALUE) {
                // TaskExecutor indicated a preference for short-lived tasks. According to
                // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
                // unless the user specified a custom value.
                this.maxMessagesPerTask = 10;
            }
        }

Or your taskExecutor is not that SchedulingTaskExecutor (which is indeed by default) and therefore all your messages are handled by the same consumer instance.

Some docs are here as well: https://docs.spring.io/spring-framework/reference/integration/jms/using.html#jms-mdp

Upvotes: 0

Related Questions