Reputation: 765
QUEUE_INI use IBM MQ9, latest spring boot, and IBM's spring-boot-starter.
I'd like to receive a message with @JmsListner, then without committing it, send another message to another queue, receive response and then commit all.
So far I has next:
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
I'm stuck on 'Message sent'. It looks like sub-request hasn't really sent. I see in MQ UI that queue depth is 1 but there is no message inside, and my sub-request listener also doesn't see any messages.
I've also tried use sendAndReceive
method:
Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session -> {
Message msg = session.createTextMessage();
msg.setStringProperty("type", "com.example.SubRequest");
LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
return msg;
});
But I do not have permissions to access model queue.
Is there any way to make this work?
UPDATE:
I made this work with combined help from you all. I end up with separate service to only send sub request with @Transactional(propagation = Propagation.REQUIRES_NEW)
. All other logic remained within main listener.
Also turning on transactions start/end logs was helpful:
logging:
level:
org.springframework.transaction.interceptor: trace
Upvotes: 1
Views: 7769
Reputation: 402
Unfortunately I am unable to add a comment to the previous answer as I don’t have sufficient privilege.
As @Nicholas says, your message receive is in the same transaction as the message send. Until the receiveMessage
method completes the initial send won’t be committed to the queue for the receiver to consume. I am assuming you might have the receive configured with something like RECEIVE_TIMEOUT_INDEFINITE_WAIT
which would then block completion of the method.
With this configuration, you already have a Message Listener bound to QUEUE_IN
. Messages delivered to that queue would be delivered to just one consumer. I see you are using selectors to avoid this problem.
One option might be to introduce another Message Listener for type = com.example.SubResponse
and remove the blocking receive from the first.
Upvotes: 3
Reputation: 16066
Your outbound message sender is enrolled in the same transaction as your message receiver. So the receiver of the outbound message won't see the message until the transaction commits. I think you need to start a new transaction to perform the inner procedure.
== Update ==
So it's been a while, and I don't have a dev environment set up for this, but I suggest something like this. Essentially, you're splitting up the Listener and Sender/Receiver into two separate classes. The Sender/Receiver should be injected into your listener so the @Transactional annotation is honored.
public class MyListener {
private final MySender sender;
public MyListener(MySender sender) {
this.sender = sender;
}
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
Message reply = sender.sendAndReceive()
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
}
public class MySender {
private final JmsTemplate jmsTemplate2;
private final Destination QUEUE_OUT;
public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
this.jmsTemplate2 = jmsTemplate2;
this.QUEUE_OUT = QUEUE_OUT;
}
@Transactional(propagation=Propagation.NESTED) // Or REQUIRES_NEW, edepending on usage
public Message sendAndReceive() throws JMSException {
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
return reply;
}
}
The @Transactional(propagation=Propagation.NESTED) will start a new transaction if one is already running (and commit/rollback when the method exits), so you should be able to send/receive the message and return it to you listener.
Upvotes: 3