javaTry
javaTry

Reputation: 1323

DefaultJmsListenerContainerFactory configure process concurrency

I am configuring concurrency for my JmsFactory in order to consume my queues.

I did that:

  @Bean(name = "myFactory")
  public DefaultJmsListenerContainerFactory sqsFactory(SQSConnectionFactory connectionFactory,
      CustomJmsListenerConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("1-3");
    return factory;
  }

I saw that DefaultJmsListenerContainerFactory.setConcurrency call DefaultMessageListenerContainer behind.

I have 2 queues configured in my application, I am using spring boot:

@JmsListener(destination = "queue1", containerFactory = "myFactory")
@JmsListener(destination = "queue2", containerFactory = "myFactory")

I was reading spring documentation and faced with some methods and now I have some doubts.

1 - What is the difference between:

setConcurrency(String concurrency)
setConcurrentConsumers(int concurrentConsumers)

Even reading the documentation I did not understanding the difference and how this configuration change the application behavior. I think the setConcurrency should be a number of threads that each @jmsLister will use to get messages from queue... Could you explain a example of configuration imaging that I have 100 messages queued (each configured queue)?

2 - setMaxMessagesPerTask(int maxMessagesPerTask)

If I have 100 messages on queue, concurrency = 3 and this number is 10 (default), what is the behavior?

Upvotes: 0

Views: 6626

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

Read the Javadocs in both cases.

1.

setConcurrency(String concurrency)

This is just a convenience

/**
 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
 * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
 * <p>This listener container will always hold on to the minimum number of consumers
 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
 */
@Override
public void setConcurrency(String concurrency) {
    try {
        int separatorIndex = concurrency.indexOf('-');
        if (separatorIndex != -1) {
            setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
            setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
        }
        else {
            setConcurrentConsumers(1);
            setMaxConcurrentConsumers(Integer.parseInt(concurrency));
        }
    }
    catch (NumberFormatException ex) {
        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
                "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
    }
}

setConcurrency("1-3") is the same as

setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);
  1. There is no impact on message processing; it just means that the consumer task is recycled (stopped and started) every 10 messages.
/**
 * Specify the maximum number of messages to process in one task.
 * More concretely, this limits the number of message reception attempts
 * per task, which includes receive iterations that did not actually
 * pick up a message until they hit their timeout (see the
 * {@link #setReceiveTimeout "receiveTimeout"} property).
 * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
 * reusing the original invoker threads until shutdown (at the
 * expense of limited dynamic scheduling).
 * <p>In case of a SchedulingTaskExecutor indicating a preference for
 * short-lived tasks, the default is 10 instead. Specify a number
 * of 10 to 100 messages to balance between rather long-lived and
 * rather short-lived tasks here.
 * <p>Long-lived tasks avoid frequent thread context switches through
 * sticking with the same thread all the way through, while short-lived
 * tasks allow thread pools to control the scheduling. Hence, thread
 * pools will usually prefer short-lived tasks.
 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
 * @see #setTaskExecutor
 * @see #setReceiveTimeout
 * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
 */
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
    Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
    synchronized (this.lifecycleMonitor) {
        this.maxMessagesPerTask = maxMessagesPerTask;
    }
}

Upvotes: 6

Related Questions