ocp1000
ocp1000

Reputation: 601

Consumer/Producer with ExecutorService is stuck

I'm new to multi threading and I'm trying to write the following simple program:

  1. Read a file
  2. Print the output to screen

I've created the following Classes: Consumer

public class Consumer implements Runnable {
    private BlockingQueue<String> m_Queue;

public Consumer(BlockingQueue<String> i_Queue)
{
    m_Queue = i_Queue;
}

@Override
public void run()
{
    try
    {
        String referenceID1;

        //Consuming message until exit message is received.
        while((referenceID1 = m_Queue.take()) !="EOF")
        {
            System.out.println(referenceID1);
        }
    }
    catch (Exception e)
    {
        e.printStackTrace();
    }
}}

Producer:

public class Producer implements Runnable {
private BlockingQueue<String> m_Queue;
private String m_FilePath;

public Producer(BlockingQueue<String> i_Queue, String i_FilePath)
{
    m_Queue = i_Queue;
    m_FilePath = i_FilePath;
}

@Override
public void run()
{
    try (BufferedReader reader = new BufferedReader(new FileReader(m_FilePath)))
    {
        String line;
        while ((line = reader.readLine()) != null)
        {
            m_Queue.put(line);
            System.out.println(line + " Was added to queue.");
        }

        //Adding an exit message.
        m_Queue.put("EOF");
        System.out.println("EOF Was added to queue.");
    }
    catch (IOException | InterruptedException e)
    {
        e.printStackTrace();
    }
}}

ProducerConsumerService

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

    //Start the producer; Will read the file
    threadPool.execute(new Producer(queue, args[0]));

    for (int i = 0; i < 4; i++)
    {
        System.out.println("Generating consumer " + i+1);
        threadPool.execute(new Consumer(queue));
    }

    try
    {
        threadPool.shutdown();
        System.out.println("Shutting down threads.");

        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        System.out.println("Terminated successfully.");
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    } }

The input file consists of the numbers 1-20, each number in a new line. When I run the program, I can see that all numbers are read, but the program seems to hang/stuck and I don't see the "Terminated successfully" message.
This doesn't happen if I use a single thread for reading and a single thread for printing on screen, but using 1 thread defies my need of a "multi-threaded" program.
My guess is that I'm forgetting to release a resource, but I have no idea why.

Upvotes: 1

Views: 590

Answers (1)

Nicolas Filotto
Nicolas Filotto

Reputation: 44985

Your problem is that you use take() in your Consumer which:

Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

And you test if the returned value is EOF (not properly BTW you need to use equals) but you put it only once into your queue and you have 4 Consumer so 3 Consumer still wait for it.

So you should use poll() instead as below:

while((referenceID1 = m_Queue.poll()) != null && !Objects.equals(referenceID1, "EOF"))

Or simply if you get rid of EOF

while((referenceID1 = m_Queue.poll()) != null)

Upvotes: 2

Related Questions