Reputation: 187
A requester is sending messages over a normal queue to a responder, indicating a dynamic queue it created as a reply queue. The responder puts these same messages on the reply queue. The responder retrieves all messages correctly.
For each message sent the requester obtains a message from the reply queue, but its body is filled with zeroes. Both programs are written in Java, using com.ibm.mq.allclient-9.2.2.0.jar. When I wrote the same in JavaScript with Node.js and ibmmq for node, everything worked fine.
Requester.java:
package com.hellerim.imq.comm.requester;
import static com.ibm.mq.constants.CMQC.MQENC_INTEGER_NORMAL;
import static com.ibm.mq.constants.CMQC.MQFMT_STRING;
import static com.ibm.mq.constants.CMQC.MQGMO_FAIL_IF_QUIESCING;
import static com.ibm.mq.constants.CMQC.MQGMO_NO_SYNCPOINT;
import static com.ibm.mq.constants.CMQC.MQGMO_NO_WAIT;
import static com.ibm.mq.constants.CMQC.MQGMO_WAIT;
import static com.ibm.mq.constants.CMQC.MQMT_REQUEST;
import static com.ibm.mq.constants.CMQC.MQOO_FAIL_IF_QUIESCING;
import static com.ibm.mq.constants.CMQC.MQOO_INPUT_EXCLUSIVE;
import static com.ibm.mq.constants.CMQC.MQOO_OUTPUT;
import static com.ibm.mq.constants.CMQC.MQPMO_NO_SYNCPOINT;
import static com.ibm.mq.constants.CMQC.MQRC_NO_MSG_AVAILABLE;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import io.netty.util.CharsetUtil;
import net.jcip.annotations.GuardedBy;
public class Requester
{
private static final int WAIT_WHILE_EMPTY = 100; // ms
private static int MAX_MILLIS_BETWEEN_REQUESTS = 100;
private static int LONG_WIDTH_IN_HEX_CHARS = 16;
private static final Charset charset = CharsetUtil.ISO_8859_1;
private MQQueueManager qMgr;
private final MQQueue requestQueue;
private final String queueNamePattern = "TEST.SESSION.*";
private String replyQueueName;
private final MQQueue replyQueue;
private final MQGetMessageOptions getOptions = new MQGetMessageOptions();
private static final String MQ_MANAGER = "MY_QM";
private static final String REQUEST_QUEUE = "TEST.REQUESTS";
private static final String MODEL_QUEUE = "TEST.SESSION.MODEL";
final private Object locker = new Object();
@GuardedBy("this")
boolean stopped = false;
int rcvd = 0;
public static void main(String[] args) {
try {
Requester rq = new Requester(MQ_MANAGER, REQUEST_QUEUE, MODEL_QUEUE);
List<String> poem = writePoem();
Random requestIds = new Random();
Random delays = new Random(1000);
int cnt = 0;
int position = 0;
for (int i = 0; i < 50; ++i) {
if (i == poem.size()) {
int requestId = requestIds.nextInt(99999) + 1;
String text = poem.stream().collect(Collectors.joining("\n"));
String request = appRequestFrom(text, requestId);
rq.write(request);
System.out.println("Requester: sent request no " + (++cnt) + " - " + requestId);
}
position %= poem.size();
String line = poem.get(position);
int requestId = requestIds.nextInt(99999) + 1;
String request = appRequestFrom(line, requestId);
rq.write(request);
System.out.println("Requester: sent request no " + (++cnt) + " - " + requestId);
position++;
try {
Thread.sleep((long) Math.ceil((Math.pow(
delays.nextDouble(), 4) * MAX_MILLIS_BETWEEN_REQUESTS) + 1));
} catch (InterruptedException e) {
// ignore
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// ignore
}
rq.close();
} catch (MQException e) {
e.printStackTrace();
}
}
public Requester(String mqManagerName, String requestQueueName, String modelQueueName) throws MQException {
super();
System.out.println("Requester: establishing mq session (mq manager: " + mqManagerName +
"/ request queue: " + requestQueueName + " / model queue: " + modelQueueName +")");
qMgr = new MQQueueManager(mqManagerName);
// get request queue
int openOptions = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
requestQueue = qMgr.accessQueue(requestQueueName, openOptions);
// get dynamic reply queue
int inputOptions = MQOO_INPUT_EXCLUSIVE + MQOO_FAIL_IF_QUIESCING;
replyQueue = new MQQueue(qMgr,
modelQueueName,
inputOptions,
"",
queueNamePattern,
"");
replyQueueName = replyQueue.getName();
System.out.println("Requester: created temporary reply queue " + replyQueueName);
getOptions.options = MQGMO_NO_SYNCPOINT +
MQGMO_NO_WAIT +
MQGMO_FAIL_IF_QUIESCING;
// catch-up (for those replies not retrieved after a request was put)
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
// read options
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = MQGMO_NO_SYNCPOINT +
MQGMO_WAIT +
MQGMO_FAIL_IF_QUIESCING;
getOptions.waitInterval = WAIT_WHILE_EMPTY;
while(proceed()) {
try {
if (!retrieveMessage(getOptions)) {
try {
Thread.sleep(getOptions.waitInterval);
} catch (InterruptedException e1) {}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
private boolean retrieveMessage(MQGetMessageOptions getOptions) throws IOException {
MQMessage msg = new MQMessage();
try {
msg.clearMessage();
msg.seek(0);
replyQueue.get(msg, getOptions);
System.out.println("Requester: reply no " + ++rcvd + " received - id: " +
Long.parseLong(new String(msg.messageId, Charset.forName("ISO_8859_1")), 16));
byte[] buf = new byte[msg.getDataLength()];
String message = new String(buf, charset);
System.out.println("Requester: message received:\n" + message);
} catch (MQException e) {
if (e.reasonCode == MQRC_NO_MSG_AVAILABLE) {
return false;
}
}
return true;
}
public byte[] write(String message) {
int positionRequestId = 24;
int endIndex = positionRequestId + 16;
CharSequence requestId = message.substring(positionRequestId, endIndex);
StringBuffer sb = new StringBuffer("00000000");
sb.append(requestId);
byte[] id = sb.toString().getBytes(charset);
MQMessage mqMsg = new MQMessage();
mqMsg.characterSet = 819;
mqMsg.encoding = MQENC_INTEGER_NORMAL;
mqMsg.format = MQFMT_STRING;
mqMsg.messageType = MQMT_REQUEST;
mqMsg.messageId = id;
mqMsg.correlationId = id;
mqMsg.replyToQueueName = replyQueueName;
try {
mqMsg.writeString(message);
mqMsg.seek(0);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQPMO_NO_SYNCPOINT;
requestQueue.put(mqMsg, pmo);
} catch (IOException e) {
e.printStackTrace();
} catch (MQException e) {
e.printStackTrace();
}
// try to read from reply queue fail immediately
try {
retrieveMessage(getOptions);
} catch (IOException e) {
e.printStackTrace();
}
return id;
}
public void close() {
stop();
try {
Thread.sleep(2 * WAIT_WHILE_EMPTY);
} catch (InterruptedException e1) {
// ignore
}
try {
if (requestQueue != null) {
requestQueue.close();
}
if (qMgr != null) {
qMgr.disconnect();
}
} catch (MQException e) {
// ignore
}
}
public boolean proceed() {
synchronized(locker) {
return !stopped;
}
}
public void stop() {
synchronized(locker) {
stopped = true;
}
}
private static List<String> writePoem() {
List<String> poem = new ArrayList<>();
poem.add("Das Nasobem");
poem.add("von Joachim Ringelnatz");
poem.add("");
poem.add("Auf seiner Nase schreitet");
poem.add("einher das Nasobem,");
poem.add("von seineme Kind begleitet -");
poem.add("es steht noch nicht im Brehm.");
poem.add("");
poem.add("Es steht noch nicht im Meyer");
poem.add("und auch im Brockhaus nicht -");
poem.add("es tritt aus meiner Leier");
poem.add("zum ersten Mal ans Licht.");
poem.add("");
poem.add("Auf seiner Nase schreitet");
poem.add("- wie schon gesagt - seitdem");
poem.add("von seinem Kind begleitet");
poem.add("einher das Nasobem.");
poem.add("");
poem.add("");
return poem;
}
private static String iToHex(int num, int places) {
StringBuilder sb = new StringBuilder();
char[] digits = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
for (int i = 0; i < places; ++i) {
sb.append(digits[num % places]);
num /= places;
}
return sb.reverse().toString();
}
private static String iToHex(int num) {
return iToHex(num, LONG_WIDTH_IN_HEX_CHARS);
}
private static String appRequestFrom(String msgBody, int requestId) {
int headerLength = 72;
// includes message body length field here!
int bodyLength = msgBody.length();
StringBuilder sb = new StringBuilder();
sb.append("GHI "); // magic
sb.append("1"); // version major
sb.append("0"); // version minor
sb.append("0"); // flags
sb.append("1"); // app message type SYNCHRONOUS REQUEST
sb.append(iToHex(headerLength + bodyLength)); // message length
sb.append(iToHex(requestId)); // request id
sb.append(iToHex(0)); // timeout
sb.append(iToHex(bodyLength)); // message body length
sb.append(msgBody); // message body
return sb.toString();
}
}
Responder.java:
package com.hellerim.imq.comm.responder;
import static com.ibm.mq.constants.CMQC.*;
import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import io.netty.util.CharsetUtil;
import net.jcip.annotations.GuardedBy;
public class Responder
{
private MQQueueManager qMgr;
private MQQueue requestQueue;
private Map<String, MQQueue> replyQueues = new HashMap<>();
private final Object locker = new Object();
static final private int WAIT_WHILE_EMPTY = 100; // ms
@GuardedBy("locker")
private boolean stopped = false;
Thread fetcherThread = null;
private final static byte MESSAGE_TYPE_REPLY = 52; // '4'
public final static String MQ_MANAGER = "MY_QM";
public final static String REQUEST_QUEUE = "TEST.REQUESTS";
public static void main( String[] args ) throws MQException, IOException
{
System.out.println( "running reponder application" );
try {
new Responder(MQ_MANAGER, REQUEST_QUEUE).start();
} catch(Exception e) {
e.printStackTrace();
}
}
public Responder(String mqManagerName, String requestQueueName) throws MQException {
System.out.println("establishing mq session (mq manager: " + mqManagerName +
" / request queue: " + requestQueueName + ")");
qMgr = new MQQueueManager(mqManagerName);
int openOptions = MQOO_INPUT_SHARED + MQOO_FAIL_IF_QUIESCING;
requestQueue = qMgr.accessQueue(requestQueueName, openOptions);
}
public MQQueue getReplyQueue(String replyQueueName) throws MQException {
if (replyQueues.containsKey(replyQueueName)) {
return replyQueues.get(replyQueueName);
}
int openOptions = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
MQQueue replyQueue = qMgr.accessQueue(replyQueueName, openOptions);
replyQueues.put(replyQueueName, replyQueue);
System.out.println("Responder: opened dynamic reply queue");
return replyQueue;
}
private void start() throws IOException {
Runnable fetcher = new Runnable() {
@Override
public void run() {
int cnt = 0;
while(proceed()) {
MQMessage msg = new MQMessage();
try {
//msg.clearMessage();
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = MQGMO_NO_SYNCPOINT +
MQGMO_WAIT +
MQGMO_FAIL_IF_QUIESCING;
getOptions.waitInterval = WAIT_WHILE_EMPTY;
requestQueue.get(msg, getOptions);
System.out.println("Responder: message no " + ++cnt + " received");
MQQueue replyQueue = null;
try {
replyQueue = getReplyQueue(msg.replyToQueueName);
} catch(MQException e) {
if (e.completionCode == MQCC_FAILED && e.reasonCode == MQRC_UNKNOWN_OBJECT_NAME) {
// dynamic reply queue does not exist any more => message out of date
continue;
}
throw e;
}
// set message type for reply
if (msg.getDataLength() < 56) {
System.out.println("invalid message:");
System.out.println(Msg2Text(msg));
continue;
}
System.out.println(Msg2Text(msg));
int typePosition = 7;
msg.seek(typePosition);
msg.writeByte(MESSAGE_TYPE_REPLY);
msg.seek(0);
String text = Msg2Text(msg);
MQMessage msgOut = new MQMessage();
msgOut.characterSet = 819;
msgOut.encoding = MQENC_INTEGER_NORMAL;
msgOut.format = MQFMT_STRING;
msgOut.messageType = MQMT_REPLY;
msgOut.messageId = msg.messageId;
msgOut.correlationId = msg.correlationId;
msgOut.seek(0);
msgOut.writeString(text);
msgOut.seek(0);
System.out.println(text);
// System.out.println("Responder: message received");
MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, same
pmo.options = MQPMO_NO_SYNCPOINT;
replyQueue.put(msgOut, pmo);
System.out.println("Responder: message no " + cnt + " returned");
} catch (MQException e) {
if (e.reasonCode == MQRC_NO_MSG_AVAILABLE) {
; // NOOP
} else {
try {
msg.seek(0);
System.out.println(msg);
} catch (EOFException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
e.printStackTrace();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {}
}
} catch (IOException e) {
e.printStackTrace();
}
}
shutdown();
}
};
Thread task = new Thread(fetcher);
task.run();
System.out.print("press <ENTER> to terminate ");
System.in.read();
System.out.println();
synchronized(locker) {
stopped = true;
}
try {
task.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static String Msg2Text(MQMessage msg) {
int length;
String text = "";
try {
length = msg.getDataLength();
byte[] buf = new byte[length];
msg.seek(0);
msg.readFully(buf);
text = new String(buf, CharsetUtil.ISO_8859_1);
msg.seek(0);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return text;
}
private boolean proceed() {
synchronized(locker) {
return !stopped;
}
}
private void shutdown() {
System.out.print("\nshutting down responder... ");
for (MQQueue queue : replyQueues.values()) {
try {
queue.close();
} catch (MQException e) { }
}
replyQueues.clear();
try {
qMgr.close();
} catch (MQException e) { }
System.out.println("done.");
}
}
Is there any idea what might be wrong?
Upvotes: 2
Views: 196
Reputation: 7486
If you know that your message payload is text (string) then you can do this:
String msgStr = msg.readStringOfByteLength(msg.getMessageLength());
System.out.println("Requester: message received:\n" + msgStr);
Upvotes: 0
Reputation: 46
It looks like you have created a buffer the right size for the message data and printed that, without moving the data into it.
In retrieveMessage:
byte[] buf = new byte[msg.getDataLength()];
String message = new String(buf, charset);
System.out.println("Requester: message received:\n" + message);
You might need to call the readFully (or similar) method to get the data.
Upvotes: 3