Reputation: 365
Here is my code for the ConcurrentApp class, which is the source of my problem:
class Processor implements Runnable {
private int id;
private Integer interaction;
private Set<Integer> subset;
private Set<Integer> y;
private Object lock = new Object();
public DCP<BSN> dcp;
public Processor(int id, Integer interaction, Set<Integer> subset, DCP<BSN> dcp, Set<Integer> y) {
this.id = id;
this.interaction = interaction;
this.subset= subset;
this.dcp = dcp;
this.y = y;
}
public void run() {
//System.out.println("Starting: " + this.id);
if (this.y.contains(this.interaction)){
this.subset.add(this.interaction);
processRemoval(this.subset);
}
//System.out.println("Completed: " + this.id);
}
public void processRemoval(Set<Integer> collection){
synchronized(Processor.lock) {
for (Iterator<Integer> iter = this.y.iterator(); iter.hasNext(); ) {
int element = iter.next();
while(element != this.interaction){
element = iter.next();
}
this.dcp.increaseScore(collection);
if (!collection.contains(this.interaction)) {
System.out.println(element + " WAS REMOVED by thread " + this.id);
iter.remove();
}
}
}
}
}
public class ConcurrentApp {
public void multiRP (DCP<BSN> dcp, int threads) {
ConcurrentHashMap<Integer,Boolean> x = new ConcurrentHashMap<Integer,Boolean>();
ConcurrentHashMap<Integer,Boolean> z = new ConcurrentHashMap<Integer,Boolean>();
Set<Integer> y = (Set<Integer>) Collections.newSetFromMap(x);
y.addAll(dcp.PA);
Set<Integer> zeta = (Set<Integer>) Collections.newSetFromMap(z);
ExecutorService executor = Executors.newFixedThreadPool(threads);
int i =1;
while ((y.size() > i) && (i <= dcp.R)){
for (Iterator<Integer> iterator = y.iterator(); iterator.hasNext();){
zeta.addAll(y);
Integer interaction = iterator.next();
zeta.remove(interaction);
ArrayList<Set<Integer>> subsets = dcp.getSubsets(zeta, i);
for (int j = 0; j< subsets.size(); j++){
executor.submit(new Processor(j, interaction, subsets.get(j), dcp, y));
}
}
i++;
}
executor.shutdown();
System.out.println("All tasks submitted");
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(y);
dcp.PA = new ArrayList<Integer>(y);
System.out.println("All tasks completed");
}
}
and associated code for the class DCP, which contains some helper functions, is here:
public class DCP<E> extends CondInd<E> {
public final int R; //assumed maximum # of nodes blocking any node from target
public ArrayList <Integer> PA; //set of all parents of the target node (should be static)
//public Node NULL; //NULL is model with no parents of target (should be final static)
public E scoreType = null;
public ScoringFunction<? super E> scoringFunction;
public double calcScore(E sT, Set<Integer> parentIndices) {
ArrayList<Integer> list = new ArrayList<Integer>(parentIndices);
return this.scoringFunction.score(sT, list);
}
public double calcScore(E sT, ArrayList<Integer> parentIndices) {
return this.scoringFunction.score(sT, parentIndices);
}
//helper for actual subsets method
private void getSubsets(ArrayList<Integer> input, int length, int start_index, Set<Integer> acc, ArrayList<Set<Integer>> sol){
//base case
if (acc.size() == length){
sol.add(new HashSet<>(acc));
return;
}
//recursive solution
if (start_index == input.size()) return;
int x = input.get(start_index);
acc.add(x);
//first recursion
getSubsets(input, length, start_index+1, acc, sol);
acc.remove(x);
//second recursion, after x removed
getSubsets(input, length, start_index+1, acc, sol);
}
//different arguments and returns a list of subsets
public ArrayList<Set<Integer>> getSubsets(ArrayList<Integer> input, int length){
ArrayList<Set<Integer>> sol = new ArrayList<>();
getSubsets(input, length, 0, new HashSet<Integer>(), sol);
return sol;
}
//different arguments and returns a list of subsets
public ArrayList<Set<Integer>> getSubsets(Set<Integer> input, int length){
ArrayList<Set<Integer>> sol = new ArrayList<>();
ArrayList<Integer> copy = new ArrayList<Integer>(input);
getSubsets(copy, length, 0, new HashSet<Integer>(), sol);
return sol;
}
//removes the element from the input that increases the score by the highest value
public void increaseScore(Set<Integer> input){
int index = -1;
double score = calcScore(scoreType,input);
List<Integer> list = new ArrayList<Integer>(input);
for (Integer element : list) {
ArrayList<Integer> copy_list = new ArrayList<Integer>(list);
copy_list.remove(element);
if (calcScore(scoreType,copy_list) > score){
index = list.indexOf(element);
score = calcScore(scoreType,copy_list);
}
}
if (index != -1)
input.remove(list.get(index));
}
public DCP(int maximumNodes, E scoreType, ScoringFunction<? super E> scoringFunction, ArrayList<Integer> parents){
this.R = maximumNodes;
this.scoreType = scoreType;
this.scoringFunction = scoringFunction;
this.PA = parents;
}
}
When I run my code in ConcurrentApp with threads = 1, I receive the following printout to my console based on my print statements:
All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 0
2 WAS REMOVED by thread 0
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds
where the first number corresponds to the integer removed from my list (e.g. "0 WAS REMOVED by thread 2" means that the value 0 was removed from the main list y). This output makes sense since every value that needed to be deleted was deleted once and gives the expected result [7, 8], which should be the only two values not deleted in this case.
However, when I run my code with >1 thread, I get the following output:
All tasks submitted
0 WAS REMOVED by thread 2
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 0
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 1
3 WAS REMOVED by thread 2
4 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
5 WAS REMOVED by thread 1
6 WAS REMOVED by thread 0
6 WAS REMOVED by thread 1
7 WAS REMOVED by thread 1
7 WAS REMOVED by thread 2
8 WAS REMOVED by thread 1
8 WAS REMOVED by thread 0
[]
All tasks completed
Program completed in :0 seconds
As you can see, in some cases the same value was removed multiple times because multiple threads decided that that value needed to be removed. Another problem is that this also changes my result by giving me an [] instead of [7, 8] because for some reason when I use multiple threads, the program incorrectly decides that 7 and 8 need to be deleted from the main list y. I fixed this problem of multiple thread deletion by adding static to the lock field:
private static Object lock = new Object();
However, now I have the problem that the runtime does not change when I increase the number of threads. The output of using threads >= 1 after adding static is below:
All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds
The number of threads does not improve the runtime but I get the correct result. This result and runtime are the exact same whether I use 1 thread or many.
Question: As I see it, there are two possible solutions:
1) remove the static keyword on the lock and find a way for a thread performing the removal to tell the other threads to skip the removed value
2) keep the static keyword and find out why my program only utilizes 1 thread when more are available.
Any ideas are greatly appreciated!
Upvotes: 0
Views: 75
Reputation: 8803
(You really have to learn to post a minimum part of code, my friend).
My diagnosis: Your program behaves the same in multithreading because processRemoval
is synchronizing with the rest of threads before processing the whole set, so no wonder why the set is being processed only by the first thread.
In these cases, the usual way is to synchronize the threads before processing each item. So, it seems that you should move your synchronize
block into the loop.
However, in this case you are modifying the set within the loop, and this is likely to produce ConcurrentModificationException
. To avoid this, I suggest you also replace using of HashSet
by another concurrent implementation of Set
. For example, ConcurrentSkipListSet
or CopyOnWriteArraySet
, do your choice.
Upvotes: 0