Reputation: 780
I guess it is kind of a standard knowledge question, but will current_session
run threaded or not?
ActiveMQSession current_session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Thread sessionThreads = new Thread(current_session);
sessionThreads.start();
If not, can please someone show me a code example how I run a session threaded?
What I want is concurrent sessions with producers/consumers which write/listen to their specific queues. I already tried to write a custom Thread by passing the connection to the thread, but when I created Producers I run into an error, 'that I can't start a Producer on an unregistered session'.
Upvotes: 0
Views: 47
Reputation: 34998
There is no constructor for java.lang.Thread
that will accept a org.apache.activemq.ActiveMQSession
object. Your code won't even compile let alone run.
Here's a simple client that will create threads for producing and consuming messages:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
public class MyMultiThreadedApp {
class MyConsumer extends Thread {
private final Connection connection;
private final Destination destination;
MyConsumer(Connection connection, Destination destination) {
this.connection = connection;
this.destination = destination;
}
@Override
public void run() {
try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
MessageConsumer messageConsumer = session.createConsumer(destination);
connection.start();
Message message = messageConsumer.receive(5000);
if (message == null) {
System.out.println("Did not receive message within the allotted time.");
return;
}
System.out.println("Received message: " + message);
} catch (Throwable e) {
e.printStackTrace();
return;
}
}
}
class MyProducer extends Thread {
private final Connection connection;
private final Destination destination;
MyProducer(Connection connection, Destination destination) {
this.connection = connection;
this.destination = destination;
}
@Override
public void run() {
try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.send(session.createTextMessage("My message"));
System.out.println("Sent message");
} catch (Throwable e) {
e.printStackTrace();
return;
}
}
}
public static void main(String... args) throws Exception {
MyMultiThreadedApp myMultiThreadedApp = new MyMultiThreadedApp();
InitialContext initialContext = null;
initialContext = new InitialContext();
Queue queue = (Queue) initialContext.lookup("queue/exampleQueue");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
Connection connection = cf.createConnection();
Thread myConsumer = myMultiThreadedApp.runConsumer(connection, queue);
Thread myProducer = myMultiThreadedApp.runProducer(connection, queue);
myConsumer.join();
myProducer.join();
}
private Thread runConsumer(Connection connection, Destination destination) {
MyConsumer myConsumer = new MyConsumer(connection, destination);
myConsumer.start();
return myConsumer;
}
private Thread runProducer(Connection connection, Destination destination) {
MyProducer myProducer = new MyProducer(connection, destination);
myProducer.start();
return myProducer;
}
}
Upvotes: 1