Chetan Sistla
Chetan Sistla

Reputation: 175

How to pause and resume asynchronous consumption of JMS messages

Iam building an application using activeMQ where i have a producer and a consumer. In the consumer iam using a MessageListener to asynchronously listen to messages from producer which is done using a method called onMessage(Message message). But before consuming the messages i want to perform a condition check and then consume the messages. I do not want to use synchronous consumption of message because it will be against my design.

  void initialize() throws JMSException {
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            final Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Destination destination = session.createQueue("testQ");
            this.consumer = session.createConsumer(destination);
            this.consumer.setMessageListener(this);
}

Check Condition here like detection of internet connection etc

public void onMessage(final Message message) {
        Preconditions.checkNotNull(message);

        if (!(message instanceof TextMessage)) {
            _LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
            throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
        }

        try {
            final String messageType = message.getStringProperty("messageType");
            Preconditions.checkNotNull(messageType);
            _LOG.info("The MessageType is {}", messageType);

            final String msg = ((TextMessage) message).getText();
            Preconditions.checkNotNull(msg);
            _LOG.debug(msg);

            process(messageType, msg);
        } catch (final JMSException e) {
            _LOG.error("We could not read the message", e);
        }
    }

Any code example would be great.

Upvotes: 0

Views: 2541

Answers (2)

Shashi
Shashi

Reputation: 15273

It's not clear as to why you want to make checks because if there is any issue with connection, then your application would be notified of such error. You can setup an ExceptionListener for JMS Connection which will get invoked if there is an issue with connection.

The onMessage would not be called if there is a connection issue.

Also I would push the connection.start() method call after setting up the message listener for consumer. A connection.start() invocation indicates to messaging provider that the application is ready to receive messages.

There is connection.stop() to pause/stop message delivery. You can issue connection.start() again to resume message delivery.


Update based on your response

I suggest you use a session with client acknowledge mode.

final Session session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)

Then in onMessage method, if there is no internet connection, don't acknowledge the message. The message will be delivered again if it has not expired.

Upvotes: 1

Sangam Belose
Sangam Belose

Reputation: 4506

In Asynchronous messaging you can always check the condition before processing the message. Once the JMS message received at your Listener code(i.e. OnMessage method ) it is up to you to give the further direction. I had solved such problem where I wanted to check the JMS message with my db Content.

check the internet connection is alive by making urlConnection to the webservice url something like below:

  public static boolean isReachable(String targetUrl) throws IOException
{
   try {
     HttpURLConnection httpUrlConnection = (HttpURLConnection) new URL(
        targetUrl).openConnection();
    httpUrlConnection.setRequestMethod("HEAD");


    int responseCode = httpUrlConnection.getResponseCode();

    return responseCode == HttpURLConnection.HTTP_OK;
} catch (UnknownHostException noInternetConnection)
  {
    return false;
   }
}

And then in your onMessage method call the method as

public void onMessage(final Message message) {
    if(isReachable){
    Preconditions.checkNotNull(message);

    if (!(message instanceof TextMessage)) {
        _LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
        throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
    }

    try {
        final String messageType = message.getStringProperty("messageType");
        Preconditions.checkNotNull(messageType);
        _LOG.info("The MessageType is {}", messageType);

        final String msg = ((TextMessage) message).getText();
        Preconditions.checkNotNull(msg);
        _LOG.debug(msg);

        process(messageType, msg);
    } catch (final JMSException e) {
        _LOG.error("We could not read the message", e);
    }
  }
}

Upvotes: 1

Related Questions