Reputation: 486
I'm using spring boot with mq-jms-spring-boot-starter
to create a JMS Listener application which reads a message from a queue, process it and forward the message in to another queue.
In case of a poison message scenario, I'm trying to generate an alert. However, in order to not generate multiple alerts to the same message, I'm thinking of comparing the JMSXDeliveryCount
with BOTHRESH
value and generate the alert in the last redelivery before sending to the BOQ.
BOTHRESH
and BOQNAME
are configured for the source queue.
@JmsListener(destination = "${sourceQueue}")
public void processMessages(Message message) {
TextMessage msg = (TextMessage) message;
int boThresh;
int redeliveryCount;
try {
boThresh = message.getIntProperty("<WHAT COMES HERE>");
redeliveryCount = message.getIntProperty("JMSXDeliveryCount");
String processedMessage = this.processMessage(message);
this.forwardMessage("destinationQueue", processedMessage);
} catch (Exception e) {
if (redeliveryCount >= boThresh) {
//generate alert here
}
}
}
How should I get the value of BOTHRESH
here? Is it possible at all? I tried to get all the available properties using getPropertyNames()
method and following are all the properties I see.
JMS_IBM_Format
JMS_IBM_PutDate
JMS_IBM_Character_Set
JMSXDeliveryCount
JMS_IBM_MsgType
JMSXUserID
JMS_IBM_Encoding
JMS_IBM_PutTime
JMSXAppID
JMS_IBM_PutApplType
Upvotes: 1
Views: 1312
Reputation: 4316
This sounds like you are mixing retriable and non-retriable error handling. If you are tracking redelivers and need to send an alert, then you probably do not want to set a BOTHRESH value, and instead manage it all in your client-side code.
Recommended consumer error handling pattern:
If the message is invalid (ie.. bad JSON or XML) move to DLQ immediately. The message will never improve in quality and there is no reason to do repeated retries.
If the 'next step' in processing is down (ie. the database) reject delivery and allow redelivery delays and backout retries to kick in. This also has the benefit of allowing other consumers on the queue to attempt processing the message and eliminates the problem where one consumer has a dead path from holding up a messages.
Also, consider that using client-side consumer code to do monitoring and alerting can be problematic, since it combines different functions. If your goal is to track invalid messages, monitoring the DLQ is generally a better design pattern and it removes 'monitoring' code from your consumer code.
Upvotes: 1
Reputation: 4737
This will do it, but the code does need admin access to an admin channel, which may not be optimal for a client application.
The Configuration
import com.ibm.mq.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.ibm.mq.constants.CMQC;
import java.util.Hashtable;
@Configuration
public class MQConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${ibm.mq.queueManager:QM1}")
public String qMgrName;
@Value("${app.mq.admin.channel:DEV.ADMIN.SVRCONN}")
private String adminChannel;
@Value("${app.mq.host:localhost}")
private String host;
@Value("${app.mq.host.port:1414}")
private int port;
@Value("${app.mq.adminuser:admin}")
private String adminUser;
@Value("${app.mq.adminpassword:passw0rd}")
private String password;
@Bean
public MQQueueManager mqQueueManager() {
try {
Hashtable<String,Object> connectionProperties = new Hashtable<String,Object>();
connectionProperties.put(CMQC.CHANNEL_PROPERTY, adminChannel);
connectionProperties.put(CMQC.HOST_NAME_PROPERTY, host);
connectionProperties.put(CMQC.PORT_PROPERTY, port);
connectionProperties.put(CMQC.USER_ID_PROPERTY, adminUser);
connectionProperties.put(CMQC.PASSWORD_PROPERTY, password);
return new MQQueueManager(qMgrName, connectionProperties);
} catch (MQException e) {
logger.warn("MQException obtaining MQQueueManager");
logger.warn(e.getMessage());
}
return null;
}
}
Obtain the Queue's backout threshold
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Runner {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${app.mq.queue:DEV.QUEUE.1}")
private String queueName = "";
private final MQQueueManager mqQueueManager;
Runner(MQQueueManager mqQueueManager) {
this.mqQueueManager = mqQueueManager;
}
@Bean
CommandLineRunner init() {
return (args) -> {
logger.info("Determining Backout threshold");
try {
int[] selectors = {
CMQC.MQIA_BACKOUT_THRESHOLD,
CMQC.MQCA_BACKOUT_REQ_Q_NAME };
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[MQC.MQ_Q_NAME_LENGTH];
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
logger.info("Queue Obtained");
MQManagedObject moMyQueue = (MQManagedObject) myQueue;
moMyQueue.inquire(selectors, intAttrs, charAttrs);
int boThresh = intAttrs[0];
String backoutQname = new String(charAttrs);
logger.info("Backout Threshold: " + boThresh);
logger.info("Backout Queue: " + backoutQname);
} catch (MQException e) {
logger.warn("MQException Error obtaining threshold");
logger.warn(e.getMessage());
}
};
}
}
Upvotes: 2