Akin Dönmez
Akin Dönmez

Reputation: 363

Message consumer is got blocked after first message

I'm trying to consume messages from multiple clients and respond that I got the message, but when the first client sends a message, he doesn't get any response, and none of the other clients can send any messages. Its like my consumer class is locked. Any ideas why?

public class Consumer implements MessageListener {
    private static ConnectionFactory factory = null;
    private static Connection connection = null;
    private static Session session = null;
    private static Destination sendQueue = null;
    private static Destination recieveQueue = null;
    private static MessageConsumer consumer = null;
    private static MessageProducer producer = null;

    final static Logger logger = Logger.getLogger(Consumer.class);

    public static void main(String[] args) {

        try {   
            factory = new ActiveMQConnectionFactory( "tcp://localhost:61616");
             connection = factory.createConnection();
            connection.start();
             session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            recieveQueue = session.createQueue("BookingQueue");
             consumer = session.createConsumer(recieveQueue);
        //  JMSMessageListener listener = new JMSMessageListener();
        //  consumer.setMessageListener(listener);

        } catch (Exception e) {
            System.out.println(e);
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        if (message instanceof ObjectMessage) {
            ObjectMessage msg = (ObjectMessage)message;
            Booking booking;
            try {
                booking = (Booking) msg.getObject();
                logger.info("Received order for " + booking.getCustomer());
                sendQueue = message.getJMSReplyTo();
                producer = session.createProducer(sendQueue);
                logger.info("# The Agent sending hello");
                TextMessage messageNew = session.createTextMessage("Hello, please reply immediately to my message!");   
                messageNew.setJMSReplyTo(recieveQueue);

                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                producer.send(messageNew);
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

    }

    }


}

Upvotes: 0

Views: 183

Answers (1)

Zim-Zam O'Pootertoot
Zim-Zam O'Pootertoot

Reputation: 18148

You are not registering a MessageListener with a Session. You can fix this by creating an infinite loop that polls the Queue using a MessageConsumer, sending an acknowledgment using a new MessageProducer. Something like the following:

public class Consumer implements Runnable {
    private final Connection connection;
    private final MessageConsumer consumer;
    private final Session producerSession;
    private volatile boolean closed = false;

    public Consumer(Connection connection) {
        this.connection = connection;
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination receiveQueue = session.createQueue("BookingQueue");
        consumer = session.createConsumer(receiveQueue);
        producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public void close() {
        closed = true;
        connection.close();
    }

    @Override
    public void run() {
        while(!closed) {
            Message message = consumer.receive();
            Destination sendQueue = message.getJMSReplyTo();
            MessageProducer producer = producerSession.createProducer(sendQueue);
            // send message via producer
            producer.close();
        }
    }
}

The MessageListener has been replaced with a call to consumer.receive()

Upvotes: 1

Related Questions