RYZ34M
RYZ34M

Reputation: 1

Why does the same thread keeps locking (reentrant lock) unless I use a sleep?

So, in the code below I'm trying to code a simple client with a demultiplexer.

I have a thread responsible for receiving and storing in queues all TCP packets. This thread is obviously in a loop, where it locks the map that contains the queues, stores the packet in the correct queue and signals the thread waiting for the packet, and then unlocks the map. But I've came across a problem, this thread keeps locking the map while the other threads just sit and watch, unless I put the thread to sleep for a small time.

Can you help me understand what is happening?

Thank you, and here's the Client Code:

public class Client {

    private static class Entry{
        private final Deque<Frame> queue = new ArrayDeque<>();
        private final Condition cond;

        public Entry(ReentrantLock lock){ cond = lock.newCondition(); }

        public void addMsg(Frame frame){
            queue.add(frame);
        }

        public Frame getMsg(){
            return queue.poll();
        }

        public Condition getCond(){
            return cond;
        }

        @Override
        public String toString() {
            return "Entry{" +
                    "queue=" + queue +
                    '}';
        }
    }

    private static class Demultiplexer {
        private TaggedConnection tc;
        private IOException ioe            = null;
        private boolean keepDemultiplexing = true;
        private Map<Integer, Entry> queues = new HashMap<>();
        private ReentrantLock rtlock       = new ReentrantLock();

        public Demultiplexer(TaggedConnection tc) throws IOException {
            this.tc = tc;
        }

        public void start() {

            Thread demultiplexingThread = new Thread(() -> {
                while (keepDemultiplexing) {
                    try {
                        rtlock.lock();

                        //Receive frame
                        Frame frame = tc.receive();

                        if(frame != null) {
                            //Get entry
                            int number = frame.getNumber();
                            Entry entry = queues.get(number);

                            //Ignores messages without a corresponding entry
                            if (entry != null) {
                                entry.addMsg(frame);
                                entry.getCond().signal();
                            }
                        }
                    } catch (IOException ioe) {
                        this.ioe = ioe;
                        keepDemultiplexing = false;
                        queues.values().forEach( entry -> entry.getCond().signal() );
                    } finally { rtlock.unlock(); }

                    //TODO - why do i need this sleep, so that the other threads can lock?
                    try { Thread.sleep(10); }
                    catch (InterruptedException ignored) {}
                }

                try { tc.close(); }
                catch (IOException ioe) { this.ioe = ioe; }
            });

            demultiplexingThread.start();
        }

        public Frame receive(int number) throws IOException {
            Frame frame = null;

            try {
                rtlock.lock();

                Entry entry = queues.get(number);

                //Entry has to be created before expecting a receive
                if (entry != null) {
                    while ((frame = entry.getMsg()) == null) {
                        entry.getCond().await();
                        if (ioe != null) { throw ioe; }
                    }
                }
            } catch (InterruptedException ignored) {
            } finally { rtlock.unlock(); }

            return frame;
        }

        public void send(Frame frame, boolean oneWay) throws IOException {

            if (frame == null) throw new IllegalArgumentException();

            int number = frame.getNumber();

            try {
                rtlock.lock();

                //Verifies the existence of the entry needed to receive the answers
                if (!oneWay) {
                    Entry entry = queues.get(number);
                    if (entry == null) {
                        entry = new Entry(rtlock);
                        queues.put(number, entry);
                    }
                }

                tc.send(frame);

            } finally { rtlock.unlock(); }
        }

        public void finishedReceivingMessages(int number){
            try{
                rtlock.lock();
                queues.remove(number);
            }finally { rtlock.unlock(); }
        }

        public void close() {
            keepDemultiplexing = false;
        }
    }

    public static void main(String[] args) throws Exception {
        Socket s = new Socket("localhost", 12345);
        s.setSoTimeout(100);
        Demultiplexer m = new Demultiplexer(new TaggedConnection(s));

        Thread[] threads = {

                new Thread(() -> {
                    try  {
                        // send request
                        m.send(new Frame(0,1, "Ola".getBytes()),false);
                        Thread.sleep(100);
                        // get reply
                        Frame frame = m.receive(0);
                        System.out.println("(1) Reply: " + new String(frame.getData()));
                        m.finishedReceivingMessages(0);
                    }  catch (Exception ignored) {}
                }),

                new Thread(() -> {
                    try  {
                        // send request
                        m.send(new Frame(1,3, "Hello".getBytes()),false);
                        Thread.sleep(100);
                        // get reply
                        Frame frame = m.receive(1);
                        System.out.println("(2) Reply: " + new String(frame.getData()));
                        m.finishedReceivingMessages(1);
                    }  catch (Exception ignored) {}
                }),

                new Thread(() -> {
                    try  {
                        // One-way
                        m.send(new Frame(2,0, ":-p".getBytes()),true);
                    }  catch (Exception ignored) {}
                }),

                new Thread(() -> {
                    try  {
                        // Get stream of messages until empty msg
                        m.send(new Frame(3,2, "ABCDE".getBytes()),false);
                        for (;;) {
                            Frame frame = m.receive(3);
                            if (frame.getData().length == 0)
                                break;
                            System.out.println("(4) From stream: " + new String(frame.getData()));
                        }
                        m.finishedReceivingMessages(3);
                    } catch (Exception ignored) {}
                }),

                new Thread(() -> {
                    try  {
                        // Get stream of messages until empty msg
                        m.send(new Frame(4,4, "123".getBytes()),false);
                        for (;;) {
                            Frame frame = m.receive(4);
                            if (frame.getData().length == 0)
                                break;
                            System.out.println("(5) From stream: " + new String(frame.getData()));
                        }
                        m.finishedReceivingMessages(4);
                    } catch (Exception ignored) {}
                })

        };

        m.start();
        for (Thread t: threads) t.start();
        for (Thread t: threads) t.join();
        m.close();
    }
}
public class TaggedConnection implements AutoCloseable {
    private Socket socket;
    private DataOutputStream dos;
    private DataInputStream dis;
    private ReentrantLock rLock = new ReentrantLock();
    private ReentrantLock wLock = new ReentrantLock();

    public TaggedConnection(Socket socket) throws IOException {
        this.socket = socket;
        this.dos    = new DataOutputStream(socket.getOutputStream());
        this.dis    = new DataInputStream(socket.getInputStream());
    }

    public void send(Frame frame) throws IOException {
        send(frame.getNumber(), frame.getTag(),frame.getData());
    }

    public void send(int number, int tag, byte[] data) throws IOException {
        wLock.lock();
        try {
            dos.writeInt(number);
            dos.writeInt(tag);
            dos.writeInt(data.length);
            dos.write(data);
            dos.flush();
        } finally { wLock.unlock(); }
    }

    public Frame receive() throws IOException {
        int number, tag, dataSize;
        byte[] data;

        try{
            rLock.lock();
            number   = dis.readInt();
            tag      = dis.readInt();
            dataSize = dis.readInt();
            data     = new byte[dataSize];
            dis.readFully(data);
        } catch (SocketTimeoutException ste){ return null;
        } finally { rLock.unlock(); }

        return new Frame(number, tag, data);
    }

    public void close() throws IOException {
        socket.shutdownInput();
        socket.shutdownOutput();
        socket.close();
    }
}
public class Frame {
    private final int number;
    private final int tag;
    private final byte[] data;

    public Frame(int number, int tag, byte[] data) {
        this.number = number; this.tag = tag; this.data = data;
    }

    public int getNumber() {
        return number;
    }

    public int getTag() {
        return tag;
    }

    public byte[] getData() {
        return data.clone();
    }
}

Upvotes: 0

Views: 147

Answers (0)

Related Questions