Chris
Chris

Reputation: 25

Coordinating Java Threadpool with conditions

I have a thread-pool that is getting stuck. The program below creates an instance of ThreadLocks and passes in a 1D array of integer values that act as "tasks".

The 1d Array represents the number of tasks needing to be done. (EX: aValues = [100,200,300]) is 100 tasks, 200 tasks and 300 tasks. Each "Task layer" in the previous index must be completed before the program moves to the proceeding index of tasks. So the 100 tasks in index 0 must be completed before the 200 tasks in index 1 is started.

My attempt to coordinate the tasks is done by using another int array of equal length to aValues (called syncValues). The program is designed to add 1 to the index in syncvalues when a task has been completed on that index. Therefore, when all the tasks in an aValues layer is complete, the index of aValues and syncValues should be the same and the program can continue.

This appears to work when the number of tasks are small, however if the number of tasks are large the program appears to hang indefinitely.

How can I fix the code for a large number of tasks?

I have stripped my code to make easier to understand.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

public class main {

    public static void main(String[] args)
    {
        //Shows the number of tasks needing completion before moving to correct index.
        int values[] = {500,500,500};

        //The object to complete the tasks
        ThreadLocks obAnne = new ThreadLocks(values);

        //runs through the "tasks"
        obAnne.calculate();
    }
}

class ThreadLocks {

    /* Thread Lock for sync */
    public static ReentrantLock lock = new ReentrantLock();

    /* ThreadPool that generates threads */
    private ExecutorService ThreadPool = Executors.newFixedThreadPool(1);

    /* Nodes complete a set of tasks, add 1 to syncValues after task completion*/
    private Node[][] nodes;
    private int[] syncValues;

    /* The array that holds the "tasks" to be completed */
    private int aValues[];


    public ThreadLocks(int[] aValues)
    {
        this.aValues = aValues;
        this.syncValues = new int[aValues.length];
        this.nodes = createNewTopology(aValues);
    }


    /**
    * Create the nodes in a 2d array. these nodes hold the tasks to be created.
     */
    private Node[][] createNewTopology(int[] topology)
    {
        Node[][] tmpTopology = new Node[topology.length][];

        for (int layerX = 0; layerX < topology.length; layerX++)
        {   
            Node[] layer = new Node[topology[layerX]];
            syncValues[layerX] = 0;
            for (int layerY = 0; layerY < topology[layerX]; layerY++)
            {
                    layer[layerY] = new Node(layerX);
            }
            tmpTopology[layerX] = layer;
        }
        return tmpTopology;     
    }

    /* Process Inputs, produce outputs */
    public void calculate()
    {
        /* starts a time for dubug purposes */
        long Time = System.currentTimeMillis();
        /* starts at index 1 as index 0 are inputs */
        for(int layerX = 1; layerX < nodes.length; layerX++)
        {           
            for (int layerY = 0; layerY < nodes[layerX].length; layerY++)
            {
                ThreadPool.execute(nodes[layerX][layerY]);
            }

            while(syncValues[layerX] < aValues[layerX]) {}; 
        }
        /*tracks time taken.. its how i know its done */
        System.out.print("Current Time :" + (System.currentTimeMillis() - Time));
    }


    private class Node implements Runnable
    {
        /* sets the node layer. describes what index of aValues the node is in.*/
        private int layerX;

        public Node(int layerX) {
            this.layerX = layerX;
        }

        /* Processes Data for Node on separate thread */
        @Override
        public void run()
        {   
            /* TASK WOULD BE PUT HERE */

            lock.lock();
            try {syncValues[layerX]++;}
            catch (Exception e){System.out.print("LockError");}
            finally{lock.unlock();}
        }
    }
}

Upvotes: 1

Views: 57

Answers (1)

Jihed Amine
Jihed Amine

Reputation: 2208

Why not use an AtomicInteger? it handles concurrency for you without you having to write synchronization code.

Upvotes: 1

Related Questions