Reputation: 601
I'm new to multi threading and I'm trying to write the following simple program:
I've created the following Classes: Consumer
public class Consumer implements Runnable {
private BlockingQueue<String> m_Queue;
public Consumer(BlockingQueue<String> i_Queue)
{
m_Queue = i_Queue;
}
@Override
public void run()
{
try
{
String referenceID1;
//Consuming message until exit message is received.
while((referenceID1 = m_Queue.take()) !="EOF")
{
System.out.println(referenceID1);
}
}
catch (Exception e)
{
e.printStackTrace();
}
}}
Producer:
public class Producer implements Runnable {
private BlockingQueue<String> m_Queue;
private String m_FilePath;
public Producer(BlockingQueue<String> i_Queue, String i_FilePath)
{
m_Queue = i_Queue;
m_FilePath = i_FilePath;
}
@Override
public void run()
{
try (BufferedReader reader = new BufferedReader(new FileReader(m_FilePath)))
{
String line;
while ((line = reader.readLine()) != null)
{
m_Queue.put(line);
System.out.println(line + " Was added to queue.");
}
//Adding an exit message.
m_Queue.put("EOF");
System.out.println("EOF Was added to queue.");
}
catch (IOException | InterruptedException e)
{
e.printStackTrace();
}
}}
ProducerConsumerService
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
//Start the producer; Will read the file
threadPool.execute(new Producer(queue, args[0]));
for (int i = 0; i < 4; i++)
{
System.out.println("Generating consumer " + i+1);
threadPool.execute(new Consumer(queue));
}
try
{
threadPool.shutdown();
System.out.println("Shutting down threads.");
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
System.out.println("Terminated successfully.");
}
catch (InterruptedException e)
{
e.printStackTrace();
} }
The input file consists of the numbers 1-20, each number in a new line.
When I run the program, I can see that all numbers are read, but the program seems to hang/stuck and I don't see the "Terminated successfully" message.
This doesn't happen if I use a single thread for reading and a single thread for printing on screen, but using 1 thread defies my need of a "multi-threaded" program.
My guess is that I'm forgetting to release a resource, but I have no idea why.
Upvotes: 1
Views: 590
Reputation: 44985
Your problem is that you use take()
in your Consumer
which:
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
And you test if the returned value is EOF
(not properly BTW you need to use equals) but you put it only once into your queue and you have 4 Consumer
so 3 Consumer
still wait for it.
So you should use poll()
instead as below:
while((referenceID1 = m_Queue.poll()) != null && !Objects.equals(referenceID1, "EOF"))
Or simply if you get rid of EOF
while((referenceID1 = m_Queue.poll()) != null)
Upvotes: 2