Tuco
Tuco

Reputation: 513

JMS 2.0 - MQ 9 - Topic Shared subscription doesn't work

I'm facing problem when developing an application that subscribe a MQ Topic (MQ version 9).

I need to do a shared topic connection because the application will be ran in multiple instances (cluster).

The specs and the documentation says : "A non-durable shared subscription is used by a client which needs to be able to share the work of receiving messages from a topic subscription amongst multiple consumers. A non-durable shared subscription may therefore have more than one consumer. Each message from the subscription will be delivered to only one of the consumers on that subscription."

For me, all the clients using the same subscription name are in the same "cluster", only one client will receive a message at one time.

In my code, inspired by this article, I've got an exception when the second client try to create the shared subscription. I really don't understand if this is a bug in MQ client libraries implementation or in my code.

Here my sample code :

    import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;

public class TestGB2 {

    public static void main(final String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {
            new Thread(new MyThread("THREAD" + i, "TESTSUB/#", "myClient", "SUBTEST")).start();
        }

    }

    public static class MyThread implements Runnable {

        private final String topicString;
        private final String clientId;
        private final String subscriptionName;

        public MyThread(final String threadName, final String topicString, final String clientId, final String subscriptionName) {
            Thread.currentThread().setName(threadName);
            this.topicString = topicString;
            this.clientId = clientId;
            this.subscriptionName = subscriptionName;
        }

        @Override
        public void run() {

            try {
                System.out.println(String.format("%s : Connecting...", Thread.currentThread().getName()));
                MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
                cf.setHostName("xxxx");
                cf.setPort(1416);
                cf.setQueueManager("xxxx");
                cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
                cf.setChannel("xxx");
                cf.setClientID(clientId);

                Connection con = cf.createConnection();

                Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
                con.start();
                Topic topic = session.createTopic(topicString);
                MessageConsumer messageConsumer = session.createSharedConsumer(topic, subscriptionName); // fail here

                System.out.println(String.format("%s : Waiting for a message...", Thread.currentThread().getName()));
                Message msg = messageConsumer.receive();
                System.out.println(String.format("%s : Received :\n%s", Thread.currentThread().getName(), msg));

            }
            catch (Exception ex) {
                System.out.println(String.format("%s : FAILED", Thread.currentThread().getName()));
                ex.printStackTrace();
            }

        }

    }
}

The code below tries to create 10 threads consuming messages on the same topic. Only the first thread is able to connect, all the others fail with following exception :

    com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0026: Failed to subscribe to topic 'TESTSUB' with selector 'none' using MQSUB.
There may have been a problem creating the subscription due to it being used by another message consumer.
Make sure any message consumers using this subscription are closed before trying to create a new subscription under the same name. Please see the linked exception for more information.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:472)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:214)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:212)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:112)
    at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1038)
    at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:134)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:470)
    at com.ibm.msg.client.wmq.internal.WMQSession.createSharedConsumer(WMQSession.java:938)
    at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4228)
    at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4125)
    at com.ibm.mq.jms.MQSession.createSharedConsumer(MQSession.java:1319)
    at TestGB.lambda$0(TestGB.java:33)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2042' ('MQRC_OBJECT_IN_USE').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:202)
    ... 11 more

Tried with the last lib :

<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.1.1.0</version>
</dependency>

Upvotes: 1

Views: 2393

Answers (1)

JoshMc
JoshMc

Reputation: 10642

Summary of the issue

The issue is not with your program, the issue is with the model queue associated to the topic you are subscribing to.

On the queue manager if you look at the topic object that your subscription will match, it will have a parameter MNDURMDL that points to a model queue.

If you look at the model queue you will note two parameters where either or both can cause the error you are receiving:

[ DEFSOPT( EXCL | SHARED ) ]
[ SHARE | NOSHARE ]

These must be set to DEFSOPT(SHARED) and SHARE. If either one is set to the other value you will only be able to have one subscriber on the shared subscription.


Additional details of the cause of the issue

With IBM MQ Pub/Sub, when you create a JMS subscription MQ treats this as a managed subscription, in the background IBM MQ will create a temporary queue to subscribe to the topic string. If it is a non-durable subscription the queue is a temporary dynamic queue.

The reason for the failure is that the first thread will open the temporary dynamic queue in an exclusive mode, any other threads then cannot open the temporary dynamic queue and you receive the MQRC_OBJECT_IN_USE error.


Possible cause where an application specific MNDURMDL model queue was created

I suspect the cause of this is that IBM comes with a few different default model queues.

The default for a non-durable subscriber has these settings:

   QUEUE(SYSTEM.NDURABLE.MODEL.QUEUE)      TYPE(QMODEL)
   DEFSOPT(SHARED)                         SHARE

There is another default queue that is not pub/sub specific that has these settings:

   QUEUE(SYSTEM.DEFAULT.MODEL.QUEUE)       TYPE(QMODEL)
   DEFSOPT(EXCL)                           NOSHARE

It is likely that the model queue created for use by your topic object was created with a command like the following that will default to use the setting of the SYSTEM.DEFAULT.MODEL.QUEUE.:

DEFINE QMODEL(xxx)

In the future you could either specifically set those two parameters, or define it with the LIKE keyword to force it to use a different queue to model settings from, both commands are below:

DEFINE QMODEL(xxx) DEFSOPT(SHARED) SHARE
DEFINE QMODEL(xxx) LIKE(SYSTEM.NDURABLE.MODEL.QUEUE)

Additional details on creation and usage of application specific TOPIC objects and MODEL queues

By default the root node of the tree is represented by the standard TOPIC object named SYSTEM.BASE.TOPIC, the default model queues associated to this TOPIC are shown below:

   TOPIC(SYSTEM.BASE.TOPIC)                TYPE(LOCAL)
   TOPICSTR()                              MDURMDL(SYSTEM.DURABLE.MODEL.QUEUE)
   MNDURMDL(SYSTEM.NDURABLE.MODEL.QUEUE)

If you do not define any further administrative TOPIC objects, then all topics match against SYSTEM.BASE.TOPIC. Additionally if you do not define any further administrative TOPIC objects and you want to give an application permission to a specific subset of the topic tree (for example topic strings beginning with TESTSUB) you must grant the permissions via SYSTEM.BASE.TOPIC, this in turn grants the application access for any arbitrary topic string with no restrictions.

Best practice would be to create a TOPIC object with a topic string that matches the portion of the topic tree an an application should have access to. Specific to your example of TESTSUB/# if your admin defined a new TOPIC object and specified the TOPICSTR(TESTSUB), the defaults would create it like this:

   TOPIC(TESTSUB.TOPIC)                    TYPE(LOCAL)
   TOPICSTR(TESTSUB)                       MDURMDL( )
   MNDURMDL( )

the blank MDURMDL and MNDURMDL values tell MQ to use the value from the next closest higher topic object in the tree, if nothing else is defined this would be the SYSTEM.BASE.TOPIC and the model queues would still default to using the SYSTEM.DURABLE.MODEL.QUEUE and SYSTEM.NDURABLE.MODEL.QUEUE model queues.

The admin can instead create the TOPIC object and specify different model queues for example:

   TOPIC(TESTSUB.TOPIC)                    TYPE(LOCAL)
   TOPICSTR(TESTSUB)                       MDURMDL(TESTSUB.DURABLE.MODEL.QUEUE)
   MNDURMDL(TESTSUB.NDURABLE.MODEL.QUEUE)

By doing this they can define application specific model queues that have the settings required for shared subscriptions and not impact the SYSTEM model queues. The other benefit is they can provide the application permissions for only topic strings that start with TESTSUB, for example TESTSUB/A or TESTSUB/B or TESTSUB/X/Y/Z.

Upvotes: 2

Related Questions