r3mbol
r3mbol

Reputation: 261

How to create a durable subscriber for topic on Jboss 5?

I'm trying to write a topic with durable subscriber. I got my basic topic:

<?xml version="1.0" encoding="UTF-8"?>
<server>
    <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=durableTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
    <depends>jboss.messaging:service=PostOffice</depends>
</mbean>  
</server>

And I got my subscribing MDB:

@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "topic/durableTopic"),
@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable") })
public class DurableSubscriberOne implements MessageListener {
// ...

But when I enter jmx-console or admin-console I see my topic with one non-durable subscription and no durable subscriptions.

Am I making some typo or some minor mistake, or is it more tricky than that? I'm using JBoss 5.1.0.GA.

Upvotes: 0

Views: 5899

Answers (2)

Torsten R&#246;mer
Torsten R&#246;mer

Reputation: 3926

Having had the same problem, I eventually managed to make the MDB create a durable subscription by adding two more @ActivationConfigProperty:

@ActivationConfigProperty(propertyName = "subscriptionName", propertyValue ="SomeSubscriptionName")
@ActivationConfigProperty(propertyName = "clientId", propertyValue ="SomeClientId")

Upvotes: 1

uaarkoti
uaarkoti

Reputation: 3657

I really can't find any error. The code that simply works for me. I am including my code sample and also the screenshot for your reference.

package com.jboss.example;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;

/**
 * Message-Driven Bean implementation class for: DurableMessageListener
 * 
 */
@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
        @ActivationConfigProperty(propertyName = "destination", propertyValue = "topic/durableTopic"),
        @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable") })
//, mappedName = "durableTopic")
public class DurableMessageListener implements MessageListener {

    /**
     * Default constructor.
     */
    public DurableMessageListener() {
        // TODO Auto-generated constructor stub
    }

    /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        // TODO Auto-generated method stub
        System.out.println("Received Message " + message);
    }

}

Topic Subscriber sample

package com.jboss.example;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class DurableTopicSubscriber {
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;

    public void setupPubSub() throws JMSException, NamingException {

        Properties env = new Properties();
        env.setProperty("java.naming.factory.initial",
                "org.jnp.interfaces.NamingContextFactory");
        env.setProperty("java.naming.factory.url.pkgs",
                "org.jboss.naming:org.jnp.interfaces");
        env.setProperty("java.naming.provider.url", "jnp://localhost:1099");
        InitialContext iniCtx = new InitialContext(env);
        Object tmp = iniCtx.lookup("ConnectionFactory");

        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection("guest", "guest");
        conn.setClientID("Dirabla");
        topic = (Topic) iniCtx.lookup("topic/durableTopic");

        session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }

    public void recvSync() throws JMSException, NamingException {
        System.out.println("Begin recvSync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Wait upto 5 seconds for the message
        TopicSubscriber recv = session.createSubscriber(topic);
        //TopicSubscriber recv = session.createDurableSubscriber(topic, "durableTopicName");
        Message msg = recv.receive(5000);
        while (msg != null) {
            System.out.println("DurableTopicClient.recv, msgt=" + msg);
            msg = recv.receive(5000);
        }
    }

    public void stop() throws JMSException {
        conn.stop();
        session.close();
        conn.close();
    }

    public static void main(String args[]) throws Exception {
        System.out.println("Begin DurableTopicRecvClient, now="
                + System.currentTimeMillis());
        DurableTopicSubscriber client = new DurableTopicSubscriber();
        client.recvSync();
        client.stop();
        System.out.println("End DurableTopicRecvClient");
        System.exit(0);
    }

}

Topic Sample Publisher

package com.jboss.example;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class DurableTopicPublisher {
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;

    public void setupPubSub() throws JMSException, NamingException {

        Properties env = new Properties();
        env.setProperty("java.naming.factory.initial",
                "org.jnp.interfaces.NamingContextFactory");
        env.setProperty("java.naming.factory.url.pkgs",
                "org.jboss.naming:org.jnp.interfaces");
        env.setProperty("java.naming.provider.url", "jnp://localhost:1099");
        InitialContext iniCtx = new InitialContext(env);
        Object tmp = iniCtx.lookup("ConnectionFactory");

        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection("guest", "guest");
        conn.setClientID("Dirabla");
        topic = (Topic) iniCtx.lookup("topic/durableTopic");

        session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
    }

    public void recvSync() throws JMSException, NamingException {
        System.out.println("Begin recvSync");
        setupPubSub();
        TopicPublisher topicPublisher = session.createPublisher(topic);

        Message message = session.createMessage();

        for (int i = 0; i < 10; i++) {
            message.setIntProperty("id", i);
            topicPublisher.publish(message);
        }
    }

    public void stop() throws JMSException {
        conn.stop();
        session.close();
        conn.close();
    }

    public static void main(String args[]) throws Exception {
        System.out.println("Begin DurableTopicRecvClient, now="
                + System.currentTimeMillis());
        DurableTopicPublisher client = new DurableTopicPublisher();
        client.recvSync();
        client.stop();
        System.out.println("End DurableTopicRecvClient");
        System.exit(0);
    }

}

Topic declaration is the same as yours

<?xml version="1.0" encoding="UTF-8"?>
<server>
    <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=durableTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
    <depends>jboss.messaging:service=PostOffice</depends>
</mbean>  
</server>

Screenshot

enter image description here

Upvotes: 0

Related Questions