Bill
Bill

Reputation: 357

About Thread's wait()/ notify

I was trying to write an example on how to use wait() and notify(), but seems that the wait() can't be notified

public class Transfer {
    private int[] data;
    private volatile int ptr;
    private final Object lock = new Object();

    public Transfer(int[] data) {
        this.data = data;
        this.ptr = 0;
    }

    public void send() {
        while (ptr < data.length) {
            synchronized (lock) {
                try {
                    System.out.println("-----wait");
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ptr++;
            }
        }
    }

    public void receive() {
        while (ptr < data.length) {
            synchronized (lock) {
                System.out.println("current is " + data[ptr]);
                System.out.println("-----notify");
                lock.notifyAll();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

}

////in main()

       int[] data = new int[] { 111, 222, 333, 444, 555, 666, 777, 888, 999, 000 };
        Transfer tf = new Transfer(data);
        Thread t1 = new Thread(() -> {
            tf.receive();
        });
        Thread t2 = new Thread(() -> {
            tf.send();
        });
        t2.start();
        t1.start();

but the result is : -----wait current is 111 -----notify current is 111 -----notify [endless repeat]

this is not what I expected, it should be : current is 111 current is 222...

Upvotes: 1

Views: 981

Answers (2)

rzwitserloot
rzwitserloot

Reputation: 102903

The problem with your code specifically is that you are keeping your locks way too long.

I'll first explain how wait/notify works, which is intricately connected with the concept of the monitor (synchronized), then how to do it right, and then as an encore, that you probably don't want to use this at all, it's too low level.

How does 'synchronized' work

When you write synchronized(x) you acquire the monitor - this operation can do one of three things. In all cases, x is a reference, so the reference is followed, it's about the object you find by following it.

  1. If the reference is null, this immediately throws NPE.
  2. If the object x points at has no current monitor, this thread becomes the monitor, the monitor count becomes 1, and code continues.
  3. If the object x points at has a monitor but it is this thread, then the monitor count is incremented and code continues.
  4. If the object x points at has a monitor but it is another thread, the thread will block until the monitor becomes available. Once it is available, some unfair dice show up, are rolled, and determine which of all threads 'fighting' to acquire the monitor will acquire it. Unfair in the sense that there are no guarantees made and the JVM is free to use any algorithm it wants to decide who 'wins'. If your code depends on fairness or some set order, your code is broken.
  5. Upon reaching the } of the synchronized block, the monitor count is decremented. If it hits 0, the monitor is released (and the fight as per #4 starts, if other threads are waiting). In other words, locks are 're-entrant' in java. A thread can write synchronized(a){synchronized(a){}} and won't deadlock with itself.
  6. Yes, this establishes comes-before stuff as per the Java Memory Model: Any fights arbitrated by a synchronized block will also ensure any writes by things that clearly came before (as established by who wins the fight) are observable by anything that clearly came after.
  7. A method marked as 'synchronized' is effectively equivalent to wrapping the code in synchronized(this) for instance methods, and synchronized(MyClass.class) for static methods.
  8. Monitors are not released and cannot be changed in java code* except via that } mechanism; (there is no public Thread getMonitor() {..} in j.l.Object or anywhere else) - in particular if the thread blocks for any other reason, including Thread.sleep, the monitor status does not change - your thread continues to hold on to it and thus stops all other threads from acquiring it. With one exception:

So how does wait/notify factor into this?

  1. to wait/notify on x you MUST hold the monitor. this: x.notify();, unless it is wrapped in a synchronized(x) block, does not work.
  2. When you wait(), the monitor is released, and the monitor count is remembered. a call to wait() requires 2 things to happen before it can continue: The 'wait' needs to be cancelled, either via a timeout, or an interrupt, or via a notify(All), and the thread needs to acquire that monitor again. If done normally (via a notify), by definition this is a fight, as whomever called notify neccessarily is still holding that monitor.

This then explains why your code does not work - your 'receiver' snippet holds on to the monitor while it sleeps. Take the sleep outside of the synchronized.

How do you use this, generally

The best way to use wait/notifyAll is not to make too many assumptions about the 'flow' of locking and unlocking. Only after acquiring the monitor, check some status. If the status is such that you need to wait for something to happen, then and only then start the wait() cycle. The thread that will cause that event to happen will first have to acquire the monitor and only then set steps to start the event. If this is not possible, that's okay - put in a failsafe, make the code that wait()s use a timeout (wait(500L) for example), so that if things fail, the while loop will fix the problem. Furthermore, there really is no good reason to ever use notify so forget that exists. notify makes no guarantees about what it'll unlock, and given that all threads that use wait ought to be checking the condition they were waiting for regardless of the behaviour of wait, notifyAll is always the right call to make.

So, it looks like this... let's say we're waiting for some file to exist.

// waiting side:

Path target = Paths.get("/file-i-am-waiting-for.txt");
synchronized (lock) {
   while (!Files.isRegularFile(target)) {
       try {
           lock.wait(1000L);
       } catch (InterruptedException e) {
           // this exception occurs ONLY
           // if some code explicitly called Thread.interrupt()
           // on this thread. You therefore know what it means.
           // usually, logging interruptedex is wrong!
           // let's say here you intended it to mean: just exit
           // and do nothing.

           // to be clear: Interrupted does not mean:
           // 'someone pressed CTRL+C' or 'the system is about to shutdown'.
           return;
       }
   }

   performOperation(target);
}

And on the 'file creation' side:

Path tgt = Paths.get("/file-i-am-waiting-for.txt");
Path create = tgt.getParent().resolve(tgt.getFileName() + ".create");
fillWithContent(create);
synchronized (lock) {
    Files.move(create, tgt, StandardOpenOption.ATOMIC_MOVE);
    lock.notifyAll();
}

The 'sending' (notifying) side is very simple, and note how we're using the file system to ensure that if the tgt file exists at all, it's fully formed and not a half-baked product. The receiving side uses a while loop: the notifying is itself NOT the signal to continue; it is merely the signal to re-check for the existence of this file. This is almost always how to do this stuff. Note also how all code involved with that file is always only doing things when they hold the lock, thus ensuring no clashes on that part.

But.. this is fairly low level stuff

The java.util.concurrent package has superior tooling for this stuff; for example, you may want a latch here, or a ReadWriteLock. They tend to outperform you, too.

But even juc is low level. Generally threading works best if the comm channel used between threads is inherently designed around concurrency. DBs (with a proper transaction level, such as SERIALIZABLE), or message buses like rabbitmq are such things. Why do you think script kiddies fresh off of an 8 hour course on PHP can manage to smash a website together that actually does at least hold up, thread-wise, even if it's littered with security issues? Because PHP enforces a model where all comms run through a DB because PHP is incapable of anything else in its basic deployment. As silly as these handcuffs may sound, the principle is solid, and can be applied just as easily from java.

*) sun.misc.Unsafe can do it, but it's called Unsafe for a reason.

Some closing best practices

  • Locks should be private; this is a rule broken by most examples and a lot of java code. You've done it right: if you're going to use synchronized, it should probably be on lock, which is private final Object lock = new Object();. Make it new Object[0] if you need it to be serializable, which arrays are, and Objects aren't.
  • if ever there is code in your system that does: synchronized(a) { synchronized (b) { ... }} and also code that odes: synchronized(b) { synchronized (a) { ... }} you're going to run into a deadlock at some point (each have acquired the first lock and are waiting for the second. They will be waiting forever. Be REAL careful when acquiring more than one monitor, and if you must, put in a ton of effort to ensure that you always acquire them in the same order to avoid deadlocks. Fortunately, jstack and such (tools to introspect running VMs) can tell you about deadlocks. The JVM itself, unfortunately, will just freeze in its tracks, dead as a doornail, if you deadlock it.

Upvotes: 2

Trashier Park
Trashier Park

Reputation: 46

class Transfer {
            private int[] data;
            private volatile int ptr;
            private final Object lock = new Object();

            public Transfer(int[] data) {
                this.data = data;
                this.ptr = 0;
            }

            public void send() {
                while (ptr < data.length) {
                    synchronized (lock) {
                        try {
                            System.out.println("-----wait");
                            lock.notifyAll();
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        ptr++;
                    }
                }
            }

            public void receive() {
                while (ptr < data.length) {
                    synchronized (lock) {
                        System.out.println("current is " + data[ptr]);
                        System.out.println("-----notify");
                        try {
                            lock.notifyAll();
                            lock.wait();
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                }
            }

        }

"Thread.sleep" does not release the lock. So you need "lock.wait" to release the lock and let other thread proceed. Then after "send" increment the pointer, it should also notify so that other thread who is stuck at receive can now proceed.

Upvotes: 0

Related Questions