madawa
madawa

Reputation: 486

Access BOTHRESH value of IBM MQ using JMS API

I'm using spring boot with mq-jms-spring-boot-starter to create a JMS Listener application which reads a message from a queue, process it and forward the message in to another queue.

In case of a poison message scenario, I'm trying to generate an alert. However, in order to not generate multiple alerts to the same message, I'm thinking of comparing the JMSXDeliveryCount with BOTHRESH value and generate the alert in the last redelivery before sending to the BOQ.

BOTHRESH and BOQNAME are configured for the source queue.

    @JmsListener(destination = "${sourceQueue}")
    public void processMessages(Message message) {
        TextMessage msg = (TextMessage) message;
        int boThresh;
        int redeliveryCount;
        try {
            boThresh = message.getIntProperty("<WHAT COMES HERE>");
            redeliveryCount = message.getIntProperty("JMSXDeliveryCount");
            String processedMessage = this.processMessage(message);
            this.forwardMessage("destinationQueue", processedMessage);
        } catch (Exception e) {
            if (redeliveryCount >= boThresh) {
               //generate alert here
            }
        }
    }

How should I get the value of BOTHRESH here? Is it possible at all? I tried to get all the available properties using getPropertyNames() method and following are all the properties I see.

Upvotes: 1

Views: 1312

Answers (2)

Matt Pavlovich
Matt Pavlovich

Reputation: 4316

This sounds like you are mixing retriable and non-retriable error handling. If you are tracking redelivers and need to send an alert, then you probably do not want to set a BOTHRESH value, and instead manage it all in your client-side code.

Recommended consumer error handling pattern:

  1. If the message is invalid (ie.. bad JSON or XML) move to DLQ immediately. The message will never improve in quality and there is no reason to do repeated retries.

  2. If the 'next step' in processing is down (ie. the database) reject delivery and allow redelivery delays and backout retries to kick in. This also has the benefit of allowing other consumers on the queue to attempt processing the message and eliminates the problem where one consumer has a dead path from holding up a messages.

Also, consider that using client-side consumer code to do monitoring and alerting can be problematic, since it combines different functions. If your goal is to track invalid messages, monitoring the DLQ is generally a better design pattern and it removes 'monitoring' code from your consumer code.

Upvotes: 1

chughts
chughts

Reputation: 4737

This will do it, but the code does need admin access to an admin channel, which may not be optimal for a client application.

The Configuration

import com.ibm.mq.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.ibm.mq.constants.CMQC;

import java.util.Hashtable;

@Configuration
public class MQConfiguration {
    protected final Log logger = LogFactory.getLog(getClass());

    @Value("${ibm.mq.queueManager:QM1}")
    public String qMgrName;

    @Value("${app.mq.admin.channel:DEV.ADMIN.SVRCONN}")
    private String adminChannel;

    @Value("${app.mq.host:localhost}")
    private String host;

    @Value("${app.mq.host.port:1414}")
    private int port;

    @Value("${app.mq.adminuser:admin}")
    private String adminUser;

    @Value("${app.mq.adminpassword:passw0rd}")
    private String password;


    @Bean
    public MQQueueManager mqQueueManager() {
        try {
            Hashtable<String,Object> connectionProperties = new Hashtable<String,Object>();

            connectionProperties.put(CMQC.CHANNEL_PROPERTY, adminChannel);
            connectionProperties.put(CMQC.HOST_NAME_PROPERTY, host);
            connectionProperties.put(CMQC.PORT_PROPERTY, port);
            connectionProperties.put(CMQC.USER_ID_PROPERTY, adminUser);
            connectionProperties.put(CMQC.PASSWORD_PROPERTY, password);

            return new MQQueueManager(qMgrName, connectionProperties);

        } catch (MQException e) {
            logger.warn("MQException obtaining MQQueueManager");
            logger.warn(e.getMessage());
        }

        return null;
    }
    
}


Obtain the Queue's backout threshold


import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class Runner {
    protected final Log logger = LogFactory.getLog(getClass());

    @Value("${app.mq.queue:DEV.QUEUE.1}")
    private String queueName = "";

    private final MQQueueManager mqQueueManager;

    Runner(MQQueueManager mqQueueManager) {
        this.mqQueueManager = mqQueueManager;
    }

    @Bean
    CommandLineRunner init() {
        return (args) -> {
            logger.info("Determining Backout threshold");
            try {
                int[] selectors = {
                        CMQC.MQIA_BACKOUT_THRESHOLD,
                        CMQC.MQCA_BACKOUT_REQ_Q_NAME };
                int[] intAttrs = new int[1];
                byte[] charAttrs = new byte[MQC.MQ_Q_NAME_LENGTH];

                int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_SAVE_ALL_CONTEXT;
                MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
                logger.info("Queue Obtained");

                MQManagedObject moMyQueue = (MQManagedObject) myQueue;
                moMyQueue.inquire(selectors, intAttrs, charAttrs);

                int boThresh = intAttrs[0];
                String backoutQname = new String(charAttrs);

                logger.info("Backout Threshold: " + boThresh);
                logger.info("Backout Queue: " + backoutQname);

            } catch (MQException e) {
                logger.warn("MQException Error obtaining threshold");
                logger.warn(e.getMessage());
            }
        };
    }
}

Upvotes: 2

Related Questions