hellerim
hellerim

Reputation: 187

Why do I get zero-filled messages from dynamic queue

A requester is sending messages over a normal queue to a responder, indicating a dynamic queue it created as a reply queue. The responder puts these same messages on the reply queue. The responder retrieves all messages correctly.

For each message sent the requester obtains a message from the reply queue, but its body is filled with zeroes. Both programs are written in Java, using com.ibm.mq.allclient-9.2.2.0.jar. When I wrote the same in JavaScript with Node.js and ibmmq for node, everything worked fine.

Requester.java:

package com.hellerim.imq.comm.requester;

import static com.ibm.mq.constants.CMQC.MQENC_INTEGER_NORMAL;
import static com.ibm.mq.constants.CMQC.MQFMT_STRING;
import static com.ibm.mq.constants.CMQC.MQGMO_FAIL_IF_QUIESCING;
import static com.ibm.mq.constants.CMQC.MQGMO_NO_SYNCPOINT;
import static com.ibm.mq.constants.CMQC.MQGMO_NO_WAIT;
import static com.ibm.mq.constants.CMQC.MQGMO_WAIT;
import static com.ibm.mq.constants.CMQC.MQMT_REQUEST;
import static com.ibm.mq.constants.CMQC.MQOO_FAIL_IF_QUIESCING;
import static com.ibm.mq.constants.CMQC.MQOO_INPUT_EXCLUSIVE;
import static com.ibm.mq.constants.CMQC.MQOO_OUTPUT;
import static com.ibm.mq.constants.CMQC.MQPMO_NO_SYNCPOINT;
import static com.ibm.mq.constants.CMQC.MQRC_NO_MSG_AVAILABLE;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;

import io.netty.util.CharsetUtil;
import net.jcip.annotations.GuardedBy;

public class Requester 
{
    private static final int WAIT_WHILE_EMPTY = 100; // ms
    private static int MAX_MILLIS_BETWEEN_REQUESTS = 100;
    private static int LONG_WIDTH_IN_HEX_CHARS = 16;
    private static final Charset charset = CharsetUtil.ISO_8859_1;
    private MQQueueManager qMgr;
    private final MQQueue requestQueue;
    private final String queueNamePattern = "TEST.SESSION.*";
    private String replyQueueName;
    private final MQQueue replyQueue;
    private final MQGetMessageOptions getOptions = new MQGetMessageOptions(); 

    private static final String MQ_MANAGER = "MY_QM";
    private static final String REQUEST_QUEUE = "TEST.REQUESTS";
    private static final String MODEL_QUEUE = "TEST.SESSION.MODEL";
    
    final private Object locker = new Object();
    @GuardedBy("this")
    boolean stopped = false;
    
    int rcvd = 0;
    
    public static void main(String[] args) {
        try {
            Requester rq = new Requester(MQ_MANAGER, REQUEST_QUEUE, MODEL_QUEUE);
            
            List<String> poem = writePoem();
            Random requestIds = new Random();
            Random delays = new Random(1000);
            
            int cnt = 0;
            int position = 0;
            for (int i = 0; i < 50; ++i) {
                if (i == poem.size()) {
                    int requestId = requestIds.nextInt(99999) + 1;
                    String text = poem.stream().collect(Collectors.joining("\n"));
                    String request = appRequestFrom(text, requestId);
                    rq.write(request);
                    System.out.println("Requester: sent request no " + (++cnt) + " -  " + requestId);
                }
                position %= poem.size();
                String line = poem.get(position);
                int requestId = requestIds.nextInt(99999) + 1;
                String request = appRequestFrom(line, requestId);
                rq.write(request);
                System.out.println("Requester: sent request no " + (++cnt) + " -  " + requestId);
                position++;
                try {
                    Thread.sleep((long) Math.ceil((Math.pow(
                        delays.nextDouble(), 4) * MAX_MILLIS_BETWEEN_REQUESTS) + 1));
                } catch (InterruptedException e) {
                    // ignore
                }

            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // ignore
            }
            rq.close();
        } catch (MQException e) {
            e.printStackTrace();
        }
    }

    public Requester(String mqManagerName, String requestQueueName, String modelQueueName) throws MQException {
        super();
        System.out.println("Requester: establishing mq session (mq manager: " + mqManagerName +
            "/ request queue: " + requestQueueName + " / model queue: " + modelQueueName +")");
        qMgr = new MQQueueManager(mqManagerName);
        // get request queue
        int openOptions = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
        requestQueue = qMgr.accessQueue(requestQueueName, openOptions);
        // get dynamic reply queue
        int inputOptions = MQOO_INPUT_EXCLUSIVE + MQOO_FAIL_IF_QUIESCING;
        replyQueue = new MQQueue(qMgr,
                modelQueueName,
                inputOptions,
                "",
                queueNamePattern,
                "");
        replyQueueName = replyQueue.getName();
        System.out.println("Requester: created temporary reply queue " + replyQueueName);
        getOptions.options = MQGMO_NO_SYNCPOINT +
                  MQGMO_NO_WAIT +
                  MQGMO_FAIL_IF_QUIESCING;
        
        // catch-up (for those replies not retrieved after a request was put)
        Executors.newSingleThreadExecutor().execute(new Runnable() {

            @Override
            public void run() {
                // read options
                MQGetMessageOptions getOptions = new MQGetMessageOptions();
                getOptions.options = MQGMO_NO_SYNCPOINT +
                      MQGMO_WAIT +
                      MQGMO_FAIL_IF_QUIESCING;
                getOptions.waitInterval = WAIT_WHILE_EMPTY;
                while(proceed()) {
                    try {
                        if (!retrieveMessage(getOptions)) {
                            try {
                                Thread.sleep(getOptions.waitInterval);
                            } catch (InterruptedException e1) {}
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }           
        });
    }
    
    private boolean retrieveMessage(MQGetMessageOptions getOptions) throws IOException {
        MQMessage msg = new MQMessage();
        try {
            msg.clearMessage();
            msg.seek(0);
            replyQueue.get(msg, getOptions);
            System.out.println("Requester: reply no " + ++rcvd + " received - id: " +
                Long.parseLong(new String(msg.messageId, Charset.forName("ISO_8859_1")), 16));
            byte[] buf = new byte[msg.getDataLength()];
            String message = new String(buf, charset);
            System.out.println("Requester: message received:\n" + message);
        } catch (MQException e) {
            if (e.reasonCode == MQRC_NO_MSG_AVAILABLE) {
                return false;
            }
        }
        return true;
    }
    
    public byte[] write(String message) {
        int positionRequestId = 24;
        int endIndex = positionRequestId + 16;
        CharSequence requestId = message.substring(positionRequestId, endIndex);
        StringBuffer sb = new StringBuffer("00000000");
        sb.append(requestId);
        byte[] id = sb.toString().getBytes(charset);
        MQMessage mqMsg = new MQMessage();
        mqMsg.characterSet = 819;
        mqMsg.encoding = MQENC_INTEGER_NORMAL;
        mqMsg.format = MQFMT_STRING;
        mqMsg.messageType = MQMT_REQUEST;
        mqMsg.messageId = id;
        mqMsg.correlationId = id;
        mqMsg.replyToQueueName = replyQueueName;
        try {
            mqMsg.writeString(message);
            mqMsg.seek(0);
            MQPutMessageOptions pmo = new MQPutMessageOptions();
            pmo.options = MQPMO_NO_SYNCPOINT;
            requestQueue.put(mqMsg, pmo);
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (MQException e) {
            e.printStackTrace();
        }
        // try to read from reply queue fail immediately
        try {
            retrieveMessage(getOptions);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return id;
    }
    
    public void close() {
        stop();
        try {
            Thread.sleep(2 * WAIT_WHILE_EMPTY);
        } catch (InterruptedException e1) {
            // ignore
        }
        try {
            if (requestQueue != null) {
                requestQueue.close();
            }
            if (qMgr != null) {
                qMgr.disconnect();
            }
        } catch (MQException e) {
            // ignore
        }
    }
    
    public boolean proceed() {
        synchronized(locker) {
            return !stopped;
        }
    }
    
    public void stop() {
        synchronized(locker) {
            stopped = true;
        }
    }
    
    private static List<String> writePoem() {
        List<String> poem = new ArrayList<>();
        poem.add("Das Nasobem");
        poem.add("von Joachim Ringelnatz");
        poem.add("");
        poem.add("Auf seiner Nase schreitet");
        poem.add("einher das Nasobem,");
        poem.add("von seineme Kind begleitet -");
        poem.add("es steht noch nicht im Brehm.");
        poem.add("");
        poem.add("Es steht noch nicht im Meyer");
        poem.add("und auch im Brockhaus nicht -");
        poem.add("es tritt aus meiner Leier");
        poem.add("zum ersten Mal ans Licht.");
        poem.add("");
        poem.add("Auf seiner Nase schreitet");
        poem.add("- wie schon gesagt - seitdem");
        poem.add("von seinem Kind begleitet");
        poem.add("einher das Nasobem.");
        poem.add("");
        poem.add("");
        return poem;
    }

    private static String iToHex(int num, int places) {
        StringBuilder sb = new StringBuilder();
        char[] digits = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
        for (int i = 0; i < places; ++i) {
            sb.append(digits[num % places]);
            num /= places;
        }
        return sb.reverse().toString();
    }
    
    private static String iToHex(int num) {
        return iToHex(num, LONG_WIDTH_IN_HEX_CHARS);
    }
    

    private static String appRequestFrom(String msgBody, int requestId) {
        int headerLength = 72;
        // includes message body length field here!
        int bodyLength = msgBody.length();
        StringBuilder sb = new StringBuilder();
        sb.append("GHI ");                            // magic
        sb.append("1");                               // version major
        sb.append("0");                               // version minor
        sb.append("0");                               // flags
        sb.append("1");                               // app message type SYNCHRONOUS REQUEST
        sb.append(iToHex(headerLength + bodyLength)); // message length
        sb.append(iToHex(requestId));                 // request id
        sb.append(iToHex(0));                         // timeout
        sb.append(iToHex(bodyLength));                // message body length
        sb.append(msgBody);                           // message body
        return sb.toString();
    }
    

}

Responder.java:

package com.hellerim.imq.comm.responder;

import static com.ibm.mq.constants.CMQC.*;

import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;

import io.netty.util.CharsetUtil;
import net.jcip.annotations.GuardedBy;

public class Responder
{
    
    private MQQueueManager qMgr;
    private MQQueue requestQueue;
    private Map<String, MQQueue> replyQueues = new HashMap<>();
    private final Object locker = new Object();
    static final private int WAIT_WHILE_EMPTY = 100; // ms
    @GuardedBy("locker")
    private boolean stopped = false;
    Thread fetcherThread = null;
    private final static byte MESSAGE_TYPE_REPLY = 52; // '4'
    
    public final static String MQ_MANAGER = "MY_QM";
    public final static String REQUEST_QUEUE = "TEST.REQUESTS";
    
    public static void main( String[] args ) throws MQException, IOException
    {
        System.out.println( "running reponder application" );
        try {
            new Responder(MQ_MANAGER, REQUEST_QUEUE).start();
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
    
    public Responder(String mqManagerName, String requestQueueName) throws MQException {
        System.out.println("establishing mq session (mq manager: " + mqManagerName +
            " / request queue: " + requestQueueName + ")");
        qMgr = new MQQueueManager(mqManagerName);
        int openOptions = MQOO_INPUT_SHARED + MQOO_FAIL_IF_QUIESCING;
        requestQueue = qMgr.accessQueue(requestQueueName, openOptions);
    }
    
    public MQQueue getReplyQueue(String replyQueueName) throws MQException {
        
        if (replyQueues.containsKey(replyQueueName)) {
            return replyQueues.get(replyQueueName);
        }
        int openOptions = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
        MQQueue replyQueue =  qMgr.accessQueue(replyQueueName, openOptions);
        
        replyQueues.put(replyQueueName, replyQueue);
        System.out.println("Responder: opened dynamic reply queue");
        return replyQueue;
    }
    
    private void start() throws IOException {
        
        Runnable fetcher = new Runnable() {

            @Override
            public void run() {
                int cnt = 0;
                while(proceed()) {
                    MQMessage msg = new MQMessage();
                    try {
                        //msg.clearMessage();
                        MQGetMessageOptions getOptions = new MQGetMessageOptions();
                        
                        getOptions.options = MQGMO_NO_SYNCPOINT +
                          MQGMO_WAIT +
                          MQGMO_FAIL_IF_QUIESCING;
                        getOptions.waitInterval = WAIT_WHILE_EMPTY;
                        requestQueue.get(msg, getOptions);
                        System.out.println("Responder: message no " + ++cnt + " received");
                        
                        MQQueue replyQueue = null;
                        try {
                            replyQueue = getReplyQueue(msg.replyToQueueName);
                        } catch(MQException e) {
                            if (e.completionCode == MQCC_FAILED && e.reasonCode == MQRC_UNKNOWN_OBJECT_NAME) {
                                // dynamic reply queue does not exist any more => message out of date
                                continue;
                            }
                            throw e;
                        }
                        // set message type for reply
                        if (msg.getDataLength() < 56) {
                            System.out.println("invalid message:");
                            System.out.println(Msg2Text(msg));
                            continue;
                        }
                        System.out.println(Msg2Text(msg));
                        int typePosition = 7;
                        msg.seek(typePosition);
                        msg.writeByte(MESSAGE_TYPE_REPLY);
                        msg.seek(0);
                        
                        String text = Msg2Text(msg);
                        MQMessage msgOut = new MQMessage();
                        msgOut.characterSet = 819;
                        msgOut.encoding = MQENC_INTEGER_NORMAL;
                        msgOut.format = MQFMT_STRING;
                        msgOut.messageType = MQMT_REPLY;
                        msgOut.messageId = msg.messageId;
                        msgOut.correlationId = msg.correlationId;
                        msgOut.seek(0);
                        msgOut.writeString(text);
                        msgOut.seek(0);
                        System.out.println(text);
                        
//                      System.out.println("Responder: message received");
                        MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, same
                        pmo.options = MQPMO_NO_SYNCPOINT;
                        replyQueue.put(msgOut, pmo);
                        System.out.println("Responder: message no " + cnt + " returned");
                    } catch (MQException e) {
                        if (e.reasonCode == MQRC_NO_MSG_AVAILABLE) {
                            ; // NOOP
                        } else {
                            try {
                                msg.seek(0);
                                System.out.println(msg);
                            } catch (EOFException e2) {
                                // TODO Auto-generated catch block
                                e2.printStackTrace();
                            }
                            e.printStackTrace();
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e1) {}
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                shutdown();
            }

        };
        
        Thread task = new Thread(fetcher);
        task.run();
        System.out.print("press <ENTER> to terminate ");
        System.in.read();
        System.out.println();
        synchronized(locker) {
            stopped = true;
        }
        try {
            task.join();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    private static String Msg2Text(MQMessage msg) {
        
        int length;
        String text = "";
        try {
            length = msg.getDataLength();
            byte[] buf = new byte[length];
            msg.seek(0);
            msg.readFully(buf);
            text = new String(buf, CharsetUtil.ISO_8859_1);
            msg.seek(0);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return text;
    }
    
    private boolean proceed() {
        synchronized(locker) {
            return !stopped;
        }
        
    }
    
    private void shutdown() {
        
        System.out.print("\nshutting down responder... ");
        
        for (MQQueue queue : replyQueues.values()) {
            try {
                queue.close();
            } catch (MQException e) { }
        }
        replyQueues.clear();
        try {
            qMgr.close();
        } catch (MQException e) { }
    
        System.out.println("done.");
    }

}

Is there any idea what might be wrong?

Upvotes: 2

Views: 196

Answers (2)

Roger
Roger

Reputation: 7486

If you know that your message payload is text (string) then you can do this:

String msgStr = msg.readStringOfByteLength(msg.getMessageLength());
System.out.println("Requester: message received:\n" + msgStr);

Upvotes: 0

Rich
Rich

Reputation: 46

It looks like you have created a buffer the right size for the message data and printed that, without moving the data into it.

In retrieveMessage:

byte[] buf = new byte[msg.getDataLength()];
String message = new String(buf, charset);
System.out.println("Requester: message received:\n" + message);

You might need to call the readFully (or similar) method to get the data.

Upvotes: 3

Related Questions