Vivere
Vivere

Reputation: 2280

Java producer consumer stop consumer threads

I have this piece of code and I want a good method to stop the consumer threads:

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

public class Foo {
    private final Queue<Object> queue;
    private final AtomicBoolean doneReading;
    private final int numberOfThreads = 4, N = 100;

    public Foo() {
        queue = new ArrayDeque<>();
        doneReading = new AtomicBoolean(false);
    }

    public void execute() throws InterruptedException {
        Thread[] threads = new Thread[numberOfThreads];
        for (int i = 0; i < numberOfThreads; i++) {
            threads[i] = new Thread(() -> {
                try {
                    synchronized (queue) {
                        while (!doneReading.get() || !queue.isEmpty()) {
                            if (queue.isEmpty()) {
                                queue.wait();
                                if (!queue.isEmpty()) {
                                    Object element = queue.remove();
                                    // Do stuff
                                }
                            }
                            else {
                                Object element = queue.remove();
                                // Do stuff
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            threads[i].start();
        }

        for (int i = 0; i < N; i++) {
            synchronized (queue) {
                queue.add(new Object());
                queue.notifyAll();
            }
        }

        doneReading.set(true);
        synchronized (queue) {
            queue.notifyAll();
        }

        for (Thread thread : threads) {
            thread.join();
        }
    }
}

Basically, when I have read all the data that needs to be processed, I want the consumer threads to stop. I tried while(!doneReading.get()) but this does not guarantee that there aren't any leftover items in the queue. I added !queue.isEmpty(), but in this case some threads keep on waiting even though they won't receive any notification. So I managed that I should call notifyAll() once more. This does seem to work. I also thought of adding a null in the queue, and whenever the consumer reads a null, it exits the while. Which method is better, or are there any better ideas?

Upvotes: 3

Views: 761

Answers (2)

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13535

class LimitedQueue<T> {
      ArrayDeque<T>  queue = new ArrayDeque<>();
      boolean done = false;

      synchronized void add (T item) {
            queue.add(item);
            notifyAll();
      }

     synchronized void done() 
            done=true;
             notifyAll();
       }

      // most complex method
      // waits until next item or done signal is put
       synchronized boolean isDone() {
             for (;;) {
                  if (!queue.isEmpty(){
                        return false;
                    }
                  if (done) {
                      return true;
                   }
                   wait();
             }
      }

       syncronized T remove() {
              return deque.remove();
       }
 }

 LimitedQueue<Object> queue = new LimitedQueue<>();

  class ConsumerThread extends Thread {
          
         public void run(){
                while (!queue.isDone()) {
                        Object element = queue.remove();
                         // do stuff
               }
       }
 }

 class ProducerThread extends Thread {
        public void run() {
              for (int i = 0; i < N; i++) ,{
                    queue.add(new Object());
              }
              queue.done();
         }
    }

Upvotes: 1

markspace
markspace

Reputation: 11030

One usual method is a "poison pill". Put a special value in the queue that when read kills the consumer threads. This allows them to process all of the values and not stop until they read past the final value and read the poison pill.

Some more info: https://java-design-patterns.com/patterns/poison-pill/

I also like these websites, they often have thoughtful information on Java programming:

https://mkyong.com/java/java-blockingqueue-examples/

https://www.baeldung.com/java-blocking-queue

Upvotes: 2

Related Questions