user2853437
user2853437

Reputation: 780

How do I start a threaded session in Active MQ 5.x

I guess it is kind of a standard knowledge question, but will current_session run threaded or not?

ActiveMQSession current_session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Thread sessionThreads = new Thread(current_session);
sessionThreads.start();

If not, can please someone show me a code example how I run a session threaded?

What I want is concurrent sessions with producers/consumers which write/listen to their specific queues. I already tried to write a custom Thread by passing the connection to the thread, but when I created Producers I run into an error, 'that I can't start a Producer on an unregistered session'.

Upvotes: 0

Views: 47

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 34998

There is no constructor for java.lang.Thread that will accept a org.apache.activemq.ActiveMQSession object. Your code won't even compile let alone run.

Here's a simple client that will create threads for producing and consuming messages:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class MyMultiThreadedApp {

   class MyConsumer extends Thread {
      private final Connection connection;
      private final Destination destination;

      MyConsumer(Connection connection, Destination destination) {
         this.connection = connection;
         this.destination = destination;
      }

      @Override
      public void run() {
         try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
            MessageConsumer messageConsumer = session.createConsumer(destination);
            connection.start();
            Message message = messageConsumer.receive(5000);
            if (message == null) {
               System.out.println("Did not receive message within the allotted time.");
               return;
            }
            System.out.println("Received message: " + message);
         } catch (Throwable e) {
            e.printStackTrace();
            return;
         }
      }
   }

   class MyProducer extends Thread {
      private final Connection connection;
      private final Destination destination;

      MyProducer(Connection connection, Destination destination) {
         this.connection = connection;
         this.destination = destination;
      }

      @Override
      public void run() {
         try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
            MessageProducer messageProducer = session.createProducer(destination);
            messageProducer.send(session.createTextMessage("My message"));
            System.out.println("Sent message");
         } catch (Throwable e) {
            e.printStackTrace();
            return;
         }
      }
   }

   public static void main(String... args) throws Exception {
      MyMultiThreadedApp myMultiThreadedApp = new MyMultiThreadedApp();
      InitialContext initialContext = null;   
      initialContext = new InitialContext();
      Queue queue = (Queue) initialContext.lookup("queue/exampleQueue");
      ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
      Connection connection = cf.createConnection();
      Thread myConsumer = myMultiThreadedApp.runConsumer(connection, queue);
      Thread myProducer = myMultiThreadedApp.runProducer(connection, queue);
      myConsumer.join();
      myProducer.join();
   }

   private Thread runConsumer(Connection connection, Destination destination) {
      MyConsumer myConsumer = new MyConsumer(connection, destination);
      myConsumer.start();
      return myConsumer;
   }

   private Thread runProducer(Connection connection, Destination destination) {
      MyProducer myProducer = new MyProducer(connection, destination);
      myProducer.start();
      return myProducer;
   }
}

Upvotes: 1

Related Questions