hitchhiker
hitchhiker

Reputation: 1319

How to use synchronization to keep MessageConsumer's alive in JMS/ActiveMQ?

I have one MessageProducer and multiple MessageConsumer's in ActiveMQ. I want the consumer wait until the producer published something. Then the consumers may be terminated. I'm trying to use Java synchronization to achieve this but it's not working. I do see that the producer produces something, but the consumers are not reacting to the message.

This is my code:

Producer class:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;

public class TopicProducer extends Thread {
    private final String producerMessage;
    private ActiveMQConnection connection;
    private Session session;
    private Topic topic;
    final private Object lock;

    public TopicProducer(String producerMessage, Session session, Topic topic,
                         final Object lock) {
        this.producerMessage = producerMessage;
        this.session = session;
        this.topic = topic;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (this.lock) {
                Message msg = this.session.createTextMessage(this.producerMessage);
                MessageProducer producer = this.session.createProducer(this.topic);
                System.out.println("TopicProducer: sending text:" + ((TextMessage) msg).getText());
                producer.send(msg);
                System.out.println("after publish");
                this.lock.notifyAll();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

Consumer class

import javax.jms.*;

public class TopicConsumer extends Thread {
    private Session session;
    private Topic topic;
    private String consumerName;
    final private Object lock;

    public TopicConsumer(Session session, Topic topic, String consumerName,
                         final Object lock) {
        this.session = session;
        this.topic = topic;
        this.consumerName = consumerName;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (this.lock) {
                MessageConsumer consumer = this.session.createConsumer(this.topic);
                consumer.setMessageListener(new ConsumerMessageListener(this.consumerName));
                this.lock.wait();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

Connection creator class:

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.BasicConfigurator;

public class InitConnection {
    public static String QUEUE_NAME = "MyQueue";
    public static String ACTIVEMQ_INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
    public static String ACTIVEMQ_PROVIDER_URL = "tcp://localhost:61616";
    public static String CONN_FACTORY = "ConnectionFactory";
    public static String TOPIC = "someTopic";

    private ActiveMQConnection connection;
    private ActiveMQQueue queue;
    private Session session;
    private Topic topic;

    public InitConnection() {
        try {
            this.init();
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    private void init() throws JMSException, NamingException {
        // Obtain a JNDI connection
        Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY, ACTIVEMQ_INITIAL_CONTEXT_FACTORY);
        props.setProperty(Context.PROVIDER_URL, ACTIVEMQ_PROVIDER_URL);
        InitialContext jndiContext = new InitialContext(props);

        // Look up a JMS connection factory
        ActiveMQConnectionFactory conFactory = (ActiveMQConnectionFactory) jndiContext
                .lookup(CONN_FACTORY);

        // Getting JMS connection from the server and starting it
        this.connection = (ActiveMQConnection) conFactory.createConnection();
        this.connection.start();
        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        this.session = this.connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        this.queue = new ActiveMQQueue(QUEUE_NAME);
        this.topic = session.createTopic(TOPIC);
    }

    public ActiveMQConnection getConnection() {
        return connection;
    }

    public ActiveMQQueue getQueue() {
        return queue;
    }

    public Session getSession() {
        return session;
    }


    public Topic getTopic() {
        return topic;
    }

    private void joinThreads(Thread[] threads) {
        try {
            for (int i = 0; i < threads.length; i++) {
                threads[i].join();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    public static void main(String[] args) {
        BasicConfigurator.configure(); //logs config
        InitConnection conn = new InitConnection();
        final Object lock = new Object();

        TopicProducer tp = new TopicProducer("producerMessage",
                conn.getSession(), conn.getTopic(), lock);
        TopicConsumer tc1 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer1", lock);
        TopicConsumer tc2 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer2", lock);
        TopicConsumer tc3 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer3", lock);

        tc1.start();
        tc2.start();
        tc3.start();
        tp.start();

        try {
            conn.getConnection().close();
        } catch (Exception e) {
            System.out.println(e);
        }

    }
}

Upvotes: 0

Views: 1182

Answers (1)

Maciej Miklas
Maciej Miklas

Reputation: 3330

Do not use thread synchronisation for that - it's completely wrong.

You have implemented consumer as a listener, this one is asynchronous. Instead of using listener, use receive method: https://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQMessageConsumer.html#receive--

This method will block until you send a message, afterwards it will receive this message and continue.

Upvotes: 1

Related Questions