VivekGhosh
VivekGhosh

Reputation: 95

Java Concurrency and Multithreading

This is kind of a big question.

I am attempting to create an ordered multiple producer and single consumer scenario in Java. It is ordered in the sense that after producer1, only producer2 gets control of the queue, after which producer3, after which producer1 again and so on and so forth. Just to check if that will work under every scenario, I provided the three producers with three different priorities

producer1 - Thread.NORM_PRIORITY - 4

producer2 - Thread.NORM_PRIORITY + 5

producer3 - Thread.NORM_PRIORITY

Now instead of printing what is being put in the queue and what is being consumed, I'm keeping a counter to count how many times each of the producer threads are being handed control of the queue and in what order, and printing those counts in the consumer thread.

Code is provided after the outputs. I am confused by one particular behaviour of the threads, the code posted below works as I wanted it to, but if I replace this

while(flag==false)
    wait();
if(getIndex()!=next)
    return;

in the put() function of q.java, with this

while(flag==false && getIndex()!=next)
    wait();

The producer threads are being handed control of the queue erratically. Like with the first code snippet, I am getting the following output, for producers 1,2 and 3 respectively

125  125  125
126  125  125
126  126  125
126  126  126

Producer1 is getting control of the queue first, then 2 then 3, and then 1 again.

But with the alternate option I am getting this output

2  6  8
2  6  8
2  6  8

The same producer keeps getting control of the queue.

Shouldn't the waiting thread NOT gain control of the queue unless it's index matches with the index of the thread which is supposed to get control of the q, like if next is 2, and producer3 is handled control of the queue, shouldn't it go into wait because of the while condition, and the queue be free to be approached by some other thread again, the process repeating until producer2 gets it?

QUEUE

import java.util.*;
class q
{
private volatile int size;
private volatile int clicks[];
private volatile boolean flag;
private volatile int next;
public q(int size)
{
    this.size = size;
    clicks = new int[size+1];
    flag = true;
    next = 1;
}
private synchronized int getIndex()
{
    String name = Thread.currentThread().getName();
    return (int)(name.charAt(name.length()-1))-48;
}
private synchronized void show()
{
    //System.out.println("Got control -> "+name+" for index "+index);
    if(flag==true)
    {
        int index = getIndex();
        /*
        System.out.println("Control provided to "+index);
        Scanner s = new Scanner(System.in);
        System.out.println("Press enter to continue");
        String c = s.next();
        */
        clicks[index]+=1;
        next = (index%size)+1;
        //System.out.println("Provide control to "+next);
    }
    else
    {
        int i;
        for(i = 1;i<=size;i++)
            System.out.print(clicks[i]+"  ");
        System.out.println();
    }
}
public synchronized void put()
{
    try
    {
        while(flag==false)
            wait();
        if(getIndex()!=next)
            return;
        show();
        flag = false;
        notify();
    }
    catch(Exception e)
    {
        System.out.println("Exception caught - "+e);
    }
}
public synchronized void get()
{

    try
    {
        while(flag==true)
            wait();
        show();
        flag = true;
        notifyAll();
    }
    catch(Exception e)
    {
        System.out.println("Exception caught - "+e);
    }
}
}

PRODUCER

class producer implements Runnable
{
private q queue;
public producer(q queue)
{
    this.queue = queue;
}
public void run()
{
    try
    {
        //int i;
        while(true)
            queue.put();
    }
    catch(Exception e)
    {
        System.out.println("Exception caught - "+e);
    }
}
}

CONSUMER

class consumer implements Runnable
{
private q queue;
public consumer(q queue)
{
    this.queue = queue;
}
public void run()
{
    try
    {
        while(true)
            queue.get();
    }
    catch(Exception e)
    {
        System.out.println("Exception caught - "+e);
    }
}
}

TESTCLASS

class testclass
{
private q queue;
private producer p1;    //lowest priority
private producer p2;    //highest priority
private producer p3;    //normal priority
private consumer c;
private Thread pt1;
private Thread pt2;
private Thread pt3;
private Thread ct;
public testclass()
{
    queue = new q(3);
    p1 = new producer(queue);
    p2 = new producer(queue);
    p3 = new producer(queue);
    c = new consumer(queue);
    pt1 = new Thread(p1,"producer1");
    pt2 = new Thread(p2,"producer2");
    pt3 = new Thread(p3,"producer3");
    ct = new Thread(c,"consumer");
}
public void begin()
{
    pt2.setPriority(Thread.NORM_PRIORITY + 5);
    pt1.setPriority(Thread.NORM_PRIORITY - 4);
    //pt3.setPriority(Thread.NORM_PRIORITY - 3);
    pt1.start();
    pt2.start();
    pt3.start();
    ct.start();
}
public static void main(String args[])
{
    try
    {
        testclass t = new testclass();
        t.begin();
    }
    catch(Exception e)
    {
        System.out.println("Exception caught - "+e);
    }
}
}   

Upvotes: 0

Views: 348

Answers (2)

TwoThe
TwoThe

Reputation: 14269

There are so many things wrong with this code, it is hard to tell where the actual problem is. I strongly suggest that you upgrade your knowledge on threading first by reading a good book on the topic.

The major problem here is that you confuse thread priority with order of execution. In general the order of execution with threads is undefined, and unless you enforce the order, there is no order. The only thing that thread priority does is to specify which thread is put on hold if there are more running threads than CPUs that can execute them. It will not enforce any order of execution otherwise.

I.E. when several threads try to enter a synchronized function, then one of them is granted access, but which one that will be is not specified. It could be the high priority thread, but it could also be any other as well. Since all your functions are synchronized all threads are constantly put on hold, therefore even thread priority won't do a thing, because most of the time threads are waiting on their lock anyways.

Upvotes: 1

Zsolt V
Zsolt V

Reputation: 517

It looks like you are dealing with threads and concurrency but not.

You are dealing with logical operators:

Your code

while(flag==false && getIndex()!=next)
  wait();

If flag is true then your logical expression will be false and the execution will go on. What you really need is:

while(flag==false || getIndex()!=next)
  wait();

Upvotes: 1

Related Questions