Reputation: 33
I have a IbmMqEndpoint that receives messages. What I want is the message to remain on the original queue, if an error occurs during message processing. The following implementation works flawlessly and processes error-free messages.
@Bean(name = "ibmMqListenerContainerFactory")
public DefaultJmsListenerContainerFactory ibmMqListenerContainerFactory(
@Qualifier("ibmMqConnectionFactory") ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true);
factory.setBackOff(new FixedBackOff(5000L, 3));
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.warn("spring jms custom error handling example");
logger.error(t.getCause().getMessage());
throw new MessageSendException("Error", new Exception("Test"));
}
});
return factory;
}
The following registers the MessageListener:
private SimpleJmsListenerEndpoint getJmsListenerEndpoint(String endpoint) {
SimpleJmsListenerEndpoint jmsListenerEndpoint = new SimpleJmsListenerEndpoint();
jmsListenerEndpoint.setId(endpoint);
jmsListenerEndpoint.setDestination(endpoint);
jmsListenerEndpoint.setMessageListener(m -> {
try {
byte[] body = m.getBody(byte[].class);
MyMessage myMessage = new MyMessage(body);
messageHandler.accept(myMessage);
} catch MessageSendException | JMSException e) {
throw new MessageSendException(e);
}
});
return jmsListenerEndpoint;
}
However, after the messageListener is set and I send an faulty message, then the MessageSentException gets thrown. The underlying DefaultMessageListenerContainer implementation resets the BackOff-variable currentAttempts by calling backOff.start() in the method
private void waitBeforeRecoveryAttempt() {
BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
DefaultMessageListenerContainer.this.applyBackOffTime(execution);
}
What caught my attention is, that the method handleListenerSetupFailure in DefaultMessageListenerContainer is called. But this isn't a handler setup failure, is it? I thought this is a message-handling failure, as the setup took place on application boot up.
Are here any Spring JMS Experts that can explain to me how I can implement the three backoff attemtps using the spring-boot framework? In general I don't understand, why there is an backOff option for the DefaultMessageListenerContainer, that always defaults to infinite loop...
Upvotes: 0
Views: 192
Reputation: 33
I think I will have to answer my own question and close the question as "not reproducible".
@caughts I still won't accept your answer, as the solution is to general in its own application context and doesn't refer close enough to my problem. It is a working example, ok. But I have a different architectural approach. Also the documentation, you provided a link to, doesn't explicitly state that a backoff queue is required in order to get spring jms working with their backoff implementation. In other words: the link is to specific to IBM. Further, the example you provided doesn't use the same abstracted framework approach that I use. In other words it could be considered as a workaround for my current implementation but it doesn't provide a solution to solve my problem on my level of architectural abstraction.
On one hand, I should've been more specific to my environment and I should've provided more implementation details, for example which versions I use. But on the other hand, I am also wondering a bit why no body has asked about the version I was using.
During the last month we upgraded our spring versions and I thought that the upgrades could be the reason for the strange reset behavior of the spring framework. But today, I reset my workspace to an older revision, a revision which should contain logic which produces the strange behavior. But the behavior isn't reproducible anymore. I am not sure what the reason was, it could've been a workspace inconsistency. However, I am still wondering what function the Function setBackOff has and how and when it is incremented by the spring framework with my configuration shown above.
Upvotes: 0
Reputation: 4737
So long as the backout count on the message is being correctly implemented, and a backout threshold and backout queue are defined for the queue you are listening on, then the underlying IBM JMS code should move the message to the backout queue, when the backout count reaches the threshold.
If this is not happening, then check that the backout count is being incremented. If that is being incremented, then check if the blackout queue and threshold is set. You can log these in your code, using the sample code below from this repo
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class CmdRunner115 implements CommandLineRunner{
protected final Log logger = LogFactory.getLog(getClass());
@Value("${app.l115.queue.name1:DEV.QUEUE.1}")
private String queueName = "";
private final MQQueueManager mqQueueManager;
CmdRunner115(MQQueueManager mqQueueManager) {
this.mqQueueManager = mqQueueManager;
}
@Override
public void run(String ... args) throws Exception{
logger.info("Determining Backout threshold");
try {
int[] selectors = {
CMQC.MQIA_BACKOUT_THRESHOLD,
CMQC.MQCA_BACKOUT_REQ_Q_NAME };
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[CMQC.MQ_Q_NAME_LENGTH];
int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF | CMQC.MQOO_INQUIRE | CMQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
logger.info("Queue Obtained");
MQManagedObject moMyQueue = (MQManagedObject) myQueue;
moMyQueue.inquire(selectors, intAttrs, charAttrs);
int boThresh = intAttrs[0];
String backoutQname = new String(charAttrs);
logger.info("Backout Threshold: " + boThresh);
logger.info("Backout Queue: " + backoutQname);
} catch (MQException e) {
logger.warn("MQException Error obtaining backout threshold");
logger.warn(e.getMessage());
}
}
}
If you have an incrementing backout count, a backout threshold and a backout queue, and the message isn't being moved to the backout queue, check for errors indicating access failure to the backout queue.
** Update 6 August 20204 ** In response to your comment.
Yes, a backout queue is needed along with a series of transaction related spring annotations. There is a working example in this repo.
There take a look at the simple sample, where the annotations in use are:
@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class Application {
or the responder in the request-response sample where the annotations are:
@Component
public class Responder implements SessionAwareMessageListener {
@JmsListener(destination = Requester.qName)
@Transactional(rollbackFor = Exception.class)
public void onMessage(Message msg, Session session) throws JMSException {
Upvotes: 0