Felipe Giotto
Felipe Giotto

Reputation: 13

Unable to process messages concurrently in MDB

I'm trying to create a simple MDB with EJB annotations so I could execute tasks asynchronously.

There are thousands of slow tasks to execute and the hardware has many processors and RAM, so I need to run them in a concurrent way (in many threads). It works this way in the beginning (in many threads), but after some messages it "scales down" and process only one message at a time.

These are some relevant information:

  1. I'm using Java 1.8.0_221 and WildFly 19.1.0.

  2. This is my MDB consumer:

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageConsumerTest implements MessageListener {
  (...)
}
  1. This consumer has some injected dependencies. Most of them have a @Stateless annotation, some have a @Singleton annotation. For the singleton dependencies, all of them have the @ConcurrencyManagement(ConcurrencyManagementType.BEAN) annotation.

  2. The application datasources, declared in the standalone-full.xml, have a jta="false" parameter.

  3. My queue is non-persistent (if WildFly stops, the producer check and resend all the pending messages again), so this is what I do in my producer:

@Inject
@JMSConnectionFactory("java:jboss/DefaultJMSConnectionFactory")
private JMSContext context;

@Resource(mappedName = "java:/jms/queue/MessageQueue")
private Queue queue;

(...)

context.createProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT).send(queue, msg);

I tried to change a lot of things (pool sizes in standalone-full.xml, @ActivationConfigProperty annotations in the MDB consumer, -D.... parameters), but none of them worked. The result is always the same: the MDB starts processing many objects concurrently, but drops to one.

Whats the correct way to change this behavior and/or make any deeper analysis?

Thanks in advance!

MORE INFO: I tried to check the queue running commands in the "jboss-cli" app, and this is the result:

At the beginning, there are 3 "ServerConsumer" (this is the expected behavior):

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
  consumerName=ServerConsumer [id=f3e141c0-b56d-11ea-8030-b8aeed89da5a:f3e1b6f2-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378547L
      expiration=0
      type=3
      priority=4
      userID=ID:227e8bd1-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929521797L
  consumerName=ServerConsumer [id=f3ea1b75-b56d-11ea-8030-b8aeed89da5a:f3ea4287-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378523L
      expiration=0
      type=3
      priority=4
      userID=ID:f63d0c39-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447548L
  consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378520L
      expiration=0
      type=3
      priority=4
      userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447299L

After some time, there are only 2 ServerConsumer instance, but it seems to be two messages to one of them:

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
  consumerName=ServerConsumer [id=f3d29bb3-b56d-11ea-8030-b8aeed89da5a:f3d66c45-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378580L
      expiration=0
      type=3
      priority=4
      userID=ID:6a77afd0-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929642548L
  consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378520L
      expiration=0
      type=3
      priority=4
      userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447299L
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378565L
      expiration=0
      type=3
      priority=4
      userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929552548L

After some more time, there's only one ServerConsumer instance, and it seems to be handling all the messages. At this point, concurrency is gone!

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
  consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
  elements
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378520L
      expiration=0
      type=3
      priority=4
      userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
      timestamp=1592929447299L
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378565L
      expiration=0
      type=3
      priority=4
      userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929552548L
      durable=false
      address=jms.queue.MessageQueue
      __AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
      _AMQ_ROUTING_TYPE=1
      messageID=253403378610L
      expiration=0
      type=3
      priority=4
      userID=ID:9751c5ea-b56e-11ea-8030-b8aeed89da5a
      timestamp=1592929717797L

Upvotes: 0

Views: 375

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 35152

I believe this is due to message buffering in the JMS sessions underlying the MDB. Try setting the consumerWindowSize activation configuration property to 0, e.g.:

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "consumerWindowSize", propertyValue = "0")
})
public class MessageConsumerTest implements MessageListener {
  (...)
}

This setting is discussed further in the ActiveMQ Artemis documentation.

Upvotes: 1

Related Questions