Reputation: 363
I'm trying to consume messages from multiple clients and respond that I got the message, but when the first client sends a message, he doesn't get any response, and none of the other clients can send any messages. Its like my consumer class is locked. Any ideas why?
public class Consumer implements MessageListener {
private static ConnectionFactory factory = null;
private static Connection connection = null;
private static Session session = null;
private static Destination sendQueue = null;
private static Destination recieveQueue = null;
private static MessageConsumer consumer = null;
private static MessageProducer producer = null;
final static Logger logger = Logger.getLogger(Consumer.class);
public static void main(String[] args) {
try {
factory = new ActiveMQConnectionFactory( "tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
recieveQueue = session.createQueue("BookingQueue");
consumer = session.createConsumer(recieveQueue);
// JMSMessageListener listener = new JMSMessageListener();
// consumer.setMessageListener(listener);
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
if (message instanceof ObjectMessage) {
ObjectMessage msg = (ObjectMessage)message;
Booking booking;
try {
booking = (Booking) msg.getObject();
logger.info("Received order for " + booking.getCustomer());
sendQueue = message.getJMSReplyTo();
producer = session.createProducer(sendQueue);
logger.info("# The Agent sending hello");
TextMessage messageNew = session.createTextMessage("Hello, please reply immediately to my message!");
messageNew.setJMSReplyTo(recieveQueue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(messageNew);
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Upvotes: 0
Views: 183
Reputation: 18148
You are not registering a MessageListener
with a Session
. You can fix this by creating an infinite loop that polls the Queue
using a MessageConsumer
, sending an acknowledgment using a new MessageProducer
. Something like the following:
public class Consumer implements Runnable {
private final Connection connection;
private final MessageConsumer consumer;
private final Session producerSession;
private volatile boolean closed = false;
public Consumer(Connection connection) {
this.connection = connection;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination receiveQueue = session.createQueue("BookingQueue");
consumer = session.createConsumer(receiveQueue);
producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() {
closed = true;
connection.close();
}
@Override
public void run() {
while(!closed) {
Message message = consumer.receive();
Destination sendQueue = message.getJMSReplyTo();
MessageProducer producer = producerSession.createProducer(sendQueue);
// send message via producer
producer.close();
}
}
}
The MessageListener
has been replaced with a call to consumer.receive()
Upvotes: 1