g00glechr0me
g00glechr0me

Reputation: 365

Java program runtime unchanged when adding more threads and produces different results

Here is my code for the ConcurrentApp class, which is the source of my problem:

class Processor implements Runnable {

    private int id;
    private Integer interaction;
    private Set<Integer> subset;
    private Set<Integer> y;
    private Object lock = new Object();

    public DCP<BSN> dcp;



    public Processor(int id, Integer interaction, Set<Integer> subset, DCP<BSN> dcp, Set<Integer> y) {
        this.id = id;
        this.interaction = interaction;
        this.subset= subset;
        this.dcp = dcp;
        this.y = y;
    }

    public void run() {
        //System.out.println("Starting: " + this.id);
        if (this.y.contains(this.interaction)){
            this.subset.add(this.interaction);
            processRemoval(this.subset);
        }

        //System.out.println("Completed: " + this.id);
    }

    public void processRemoval(Set<Integer> collection){
        synchronized(Processor.lock) {
            for (Iterator<Integer> iter = this.y.iterator(); iter.hasNext(); ) {
                int element = iter.next();
                while(element != this.interaction){
                    element = iter.next();
                }
                this.dcp.increaseScore(collection);
                if (!collection.contains(this.interaction)) {
                    System.out.println(element + " WAS REMOVED by thread " + this.id);
                    iter.remove();
                }
            }
        }
    }

}


public class ConcurrentApp {

    public void multiRP (DCP<BSN> dcp, int threads) {

        ConcurrentHashMap<Integer,Boolean> x = new ConcurrentHashMap<Integer,Boolean>();
        ConcurrentHashMap<Integer,Boolean> z = new ConcurrentHashMap<Integer,Boolean>();
        Set<Integer> y = (Set<Integer>) Collections.newSetFromMap(x);
        y.addAll(dcp.PA);
        Set<Integer> zeta = (Set<Integer>) Collections.newSetFromMap(z);
        ExecutorService executor = Executors.newFixedThreadPool(threads);

        int i =1;
        while ((y.size() > i) && (i <= dcp.R)){
            for (Iterator<Integer> iterator = y.iterator(); iterator.hasNext();){
                zeta.addAll(y);
                Integer interaction = iterator.next();
                zeta.remove(interaction);
                ArrayList<Set<Integer>> subsets = dcp.getSubsets(zeta, i);
                for (int j = 0; j< subsets.size(); j++){
                    executor.submit(new Processor(j, interaction, subsets.get(j), dcp, y)); 
                }
            }
            i++;
        }
        executor.shutdown();
        System.out.println("All tasks submitted");
        try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(y);
        dcp.PA = new ArrayList<Integer>(y);
        System.out.println("All tasks completed");
    }
}

and associated code for the class DCP, which contains some helper functions, is here:

public class DCP<E> extends CondInd<E> {
    public final int R;             //assumed maximum # of nodes blocking any node from target
    public ArrayList <Integer> PA;  //set of all parents of the target node (should be static)
    //public Node NULL;     //NULL is model with no parents of target (should be final static)
    public E scoreType = null;
    public ScoringFunction<? super E> scoringFunction;

    public double calcScore(E sT, Set<Integer> parentIndices) {
        ArrayList<Integer> list = new ArrayList<Integer>(parentIndices);
        return this.scoringFunction.score(sT, list);
    }

    public double calcScore(E sT, ArrayList<Integer> parentIndices) {
        return this.scoringFunction.score(sT, parentIndices);
    }


    //helper for actual subsets method
    private void getSubsets(ArrayList<Integer> input, int length, int start_index, Set<Integer> acc, ArrayList<Set<Integer>> sol){
        //base case
        if (acc.size() == length){
            sol.add(new HashSet<>(acc));
            return;
        }
        //recursive solution
        if (start_index == input.size()) return;
        int x = input.get(start_index);
        acc.add(x);
        //first recursion
        getSubsets(input, length, start_index+1, acc, sol);
        acc.remove(x);
        //second recursion, after x removed
        getSubsets(input, length, start_index+1, acc, sol);
    }

    //different arguments and returns a list of subsets
    public ArrayList<Set<Integer>> getSubsets(ArrayList<Integer> input, int length){
        ArrayList<Set<Integer>> sol = new ArrayList<>();
        getSubsets(input, length, 0, new HashSet<Integer>(), sol);
        return sol;
    }

    //different arguments and returns a list of subsets
    public ArrayList<Set<Integer>> getSubsets(Set<Integer> input, int length){
        ArrayList<Set<Integer>> sol = new ArrayList<>();
        ArrayList<Integer> copy = new ArrayList<Integer>(input);
        getSubsets(copy, length, 0, new HashSet<Integer>(), sol);
        return sol;
    }


    //removes the element from the input that increases the score by the highest value
    public void increaseScore(Set<Integer> input){
        int index = -1;
        double score = calcScore(scoreType,input);
        List<Integer> list = new ArrayList<Integer>(input);
        for (Integer element : list) {
            ArrayList<Integer> copy_list = new ArrayList<Integer>(list);
            copy_list.remove(element);
            if (calcScore(scoreType,copy_list) > score){
                index = list.indexOf(element);
                score = calcScore(scoreType,copy_list);
            }
        }
        if (index != -1)
            input.remove(list.get(index));
    }

    public DCP(int maximumNodes, E scoreType, ScoringFunction<? super E> scoringFunction, ArrayList<Integer> parents){
        this.R = maximumNodes;
        this.scoreType = scoreType;
        this.scoringFunction = scoringFunction;
        this.PA = parents;
    }
}

When I run my code in ConcurrentApp with threads = 1, I receive the following printout to my console based on my print statements:

All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 0
2 WAS REMOVED by thread 0
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds

where the first number corresponds to the integer removed from my list (e.g. "0 WAS REMOVED by thread 2" means that the value 0 was removed from the main list y). This output makes sense since every value that needed to be deleted was deleted once and gives the expected result [7, 8], which should be the only two values not deleted in this case.

However, when I run my code with >1 thread, I get the following output:

All tasks submitted
0 WAS REMOVED by thread 2
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 0
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 1
3 WAS REMOVED by thread 2
4 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
5 WAS REMOVED by thread 1
6 WAS REMOVED by thread 0
6 WAS REMOVED by thread 1
7 WAS REMOVED by thread 1
7 WAS REMOVED by thread 2
8 WAS REMOVED by thread 1
8 WAS REMOVED by thread 0
[]
All tasks completed
Program completed in :0 seconds

As you can see, in some cases the same value was removed multiple times because multiple threads decided that that value needed to be removed. Another problem is that this also changes my result by giving me an [] instead of [7, 8] because for some reason when I use multiple threads, the program incorrectly decides that 7 and 8 need to be deleted from the main list y. I fixed this problem of multiple thread deletion by adding static to the lock field:

private static Object lock = new Object();

However, now I have the problem that the runtime does not change when I increase the number of threads. The output of using threads >= 1 after adding static is below:

All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds

The number of threads does not improve the runtime but I get the correct result. This result and runtime are the exact same whether I use 1 thread or many.

Question: As I see it, there are two possible solutions:

1) remove the static keyword on the lock and find a way for a thread performing the removal to tell the other threads to skip the removed value

2) keep the static keyword and find out why my program only utilizes 1 thread when more are available.

Any ideas are greatly appreciated!

Upvotes: 0

Views: 75

Answers (1)

Little Santi
Little Santi

Reputation: 8803

(You really have to learn to post a minimum part of code, my friend).

My diagnosis: Your program behaves the same in multithreading because processRemoval is synchronizing with the rest of threads before processing the whole set, so no wonder why the set is being processed only by the first thread.

In these cases, the usual way is to synchronize the threads before processing each item. So, it seems that you should move your synchronize block into the loop.

However, in this case you are modifying the set within the loop, and this is likely to produce ConcurrentModificationException. To avoid this, I suggest you also replace using of HashSet by another concurrent implementation of Set. For example, ConcurrentSkipListSet or CopyOnWriteArraySet, do your choice.

Upvotes: 0

Related Questions