RNO
RNO

Reputation: 223

Java ExecutorService Performance

Hi I have implemented a method that calculated the Mode value from an array of millions of elements (integers). I am now comparing a sequential version to a (supposed to be ) improved version that makes use of the Executor Service... unfortunately the performance is not as good as expected:

Sequentiallly iterating hashMap (version 0)

#size   #time       #memory
10000000    13772ms     565mb
20000000    35355ms     1135mb
30000000    45879ms     1633mb

Assigning jobs to a Service Executor (version 2)
#size   #time       #memory
10000000    16186ms     573mb
20000000    34561ms     1147mb
30000000    54792ms     1719mb

The code for the Executor Service is as follows:

 /* Optimised-Threaded Method to calculate the Mode */
    private int getModeOptimisedThread(int[] mybigarray){
        System.out.println("calculating mode (optimised w/ ExecutorService)... ");

        int mode = -1;

        //create an hashmap to calculating the frequencies        
        TreeMap<Integer, Integer> treemap = new TreeMap<Integer, Integer>();

        //for each integer in the array, we put an entry into the hashmap with the 'array value' as a 'key' and frecuency as 'value'.
        for (int i : mybigarray) {
            //we check if that element already exists in the Hashmap, by getting the element with Key 'i'
            // if the element exists, we increment the frequency, otherwise we insert it with frecuency = 1;
            Integer frequency = treemap.get(i);
            int value = 0;
            if (frequency == null){ //element not found
                value = 1;
            }
            else{                   //element found
                value = frequency + 1;
            }

            //insert the element into the hashmap
            treemap.put(i, value);
        }



        //Look for the most frequent element in the Hashmap        
        int maxCount = 0;

        int n_threads = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(n_threads);


        //create a common variable to store maxCount and mode values
        Result r = new Result(mode, maxCount);

        //set the umber of jobs
        int num_jobs = 10;
        int job_size = treemap.size()/num_jobs;        

        System.out.println("Map size "+treemap.size());
        System.out.println("Job size "+job_size);

        //new MapWorker(map, 0, halfmapsize, r);
        int start_index, finish_index;

        List<Callable<Object>> todolist = new ArrayList<Callable<Object>>(num_jobs);

        //assign threads to pool

            for (int i=0; i<num_jobs; i++)
            {   
                    start_index=i*job_size;
                    finish_index = start_index+job_size;

                    System.out.println("start index: "+start_index+". Finish index: "+finish_index);
                    todolist.add(Executors.callable(new MapWorker(treemap.subMap(start_index, finish_index), r)));

            }        
       try{
           //invoke all will not return until all the tasks are completed
           es.invokeAll(todolist);
        } catch (Exception e) { 
            System.out.println("Error in the Service executor "+e);
        } finally {
           //finally the result
            mode = r.getMode(); 
        }

        //return the result
        return mode;
    }

Any suggestion about the quality of the Executor Service's code? Please suggest, it's the first time I implement the E.S.

Edit:

Worker public class MapWorker implements Runnable{

    private int index;
    private int size;
    private int maxCount;
    private Result result;
    private  Map <Integer, Integer> map;

    //Constructor    
    MapWorker( Map <Integer, Integer> _map, Result _result){        
        this.maxCount = 0;
        this.result = _result;
        map = _map;
    }

    public void run(){               
        for (Map.Entry<Integer, Integer> element : map.entrySet()) {
            if (element.getValue() > result.getCount()) {                
                 result.setNewMode(element.getKey(),element.getValue());                 
            }
        }         
    }

}

and Result class:

public class Result {
    private int mode;
    private int maxCount;

    Result(int _mode, int _maxcount){
        mode = _mode;
        maxCount = _maxcount;
    }

    public synchronized void setNewMode(int _newmode, int _maxcount) {
        this.mode = _newmode;
        this.maxCount = _maxcount;
    }

    public int getMode() {
        return mode;
    }

    public synchronized int getCount() {
        return maxCount;
    }

}

Upvotes: 1

Views: 1328

Answers (2)

shams
shams

Reputation: 3508

The chunk of the work is being done while computing the frequencies. That will significantly dominate any benefits of parallelism you will get by trying to update the results. You need to work on parallelizing the computation of the mode by each worker computing frequencies locally before updating a global frequency at the end. You can consider using AtomicInteger to store the mode in the global store to ensure thread safety. Once the frequencies have been computed, you can compute the mode sequentially at the end as it will have much lower computation cost to traverse the map sequentially.

Something like the following should work better: EDIT: modified the updateScore() method to fix a data race.

    private static class ResultStore {

private Map<Integer, AtomicInteger> store = new ConcurrentHashMap<Integer, AtomicInteger>(); public int size() { return store.size(); } public int updateScore(int key, int freq) { AtomicInteger value = store.get(key); if (value == null) { store.putIfAbsent(key, new AtomicInteger(0)); value = store.get(key); } return value.addAndGet(freq); } public int getMode() { int mode = 0; int modeFreq = 0; for (Integer key : store.keySet()) { int value = store.get(key).intValue(); if (modeFreq < value) { modeFreq = value; mode = key; } } return mode; } } private static int computeMode(final int[] mybigarray) { int n_threads = Runtime.getRuntime().availableProcessors(); ExecutorService es = Executors.newFixedThreadPool(n_threads); final ResultStore rs = new ResultStore(); //set the number of jobs int num_jobs = 10; int job_size = mybigarray.length / num_jobs; System.out.println("Map size " + mybigarray.length); System.out.println("Job size " + job_size); List<Callable<Object>> todolist = new ArrayList<Callable<Object>>(num_jobs); for (int i = 0; i < num_jobs; i++) { final int start_index = i * job_size; final int finish_index = start_index + job_size; System.out.println("Start index: " + start_index + ". Finish index: " + finish_index); todolist.add(Executors.callable(new Runnable() { @Override public void run() { final Map<Integer, Integer> localStore = new HashMap<Integer, Integer>(); for (int i = start_index; i < finish_index; i++) { final Integer loopKey = mybigarray[i]; Integer loopValue = localStore.get(loopKey); if (loopValue == null) { localStore.put(loopKey, 1); } else { localStore.put(loopKey, loopValue + 1); } } for (Integer loopKey : localStore.keySet()) { final Integer loopValue = localStore.get(loopKey); rs.updateScore(loopKey, loopValue); } } })); } try { //invoke all will not return until all the tasks are completed es.invokeAll(todolist); } catch (Exception e) { System.out.println("Error in the Service executor " + e); } return rs.getMode(); }

Upvotes: 1

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13515

  1. for each job, use separate Result object (without synchronization). When all jobs finish, chose result with maximum value.

  2. int num_jobs = n_threads;

Upvotes: 1

Related Questions