Mulone
Mulone

Reputation: 3663

Simple Groovy thread manager

I wrote a little thread manager in Groovy. The manager takes an array, a task to execute on an array, and a chunk size. The task is then executed in a separate thread, and the results added to a result array.

This is the class code:

class ParallelManager {

  static def _threads = []
  static def _threadsFinishedCorrectly = []
  static  def _results = []

  static def runParallelTask( def dataArray, def taskFunc, int chunkSize ){
    assert chunkSize > 0
    assert taskFunc
    if (dataArray.size()==0) return

    assert dataArray.size() >= 0

    def subArray = partitionArray(dataArray, chunkSize)
    assert subArray.size() > 0

    subArray.each{ arrChunk->
        _threads.add( Thread.start{

            def chunkResults = taskFunc(arrChunk)
            assert chunkResults != null 
            _results.add(chunkResults) // EXCEPTION HERE
            _threadsFinishedCorrectly.add(true)
        })
    }

    // wait for all threads to finish
    _threads.each{ it.join() }
    log.info("Waiting for all threads to finish...")

    assert _threadsFinishedCorrectly.size() == _threads.size(),'some threads failed.'
    assert _results.size() == _threads.size() 
    log.info("${_threads.size()} finished.")

    return _results
}

/**
* Util function
* @param array
* @param size
* @return
*/
  static def partitionArray(array, size) {
   def partitions = []
   int partitionCount = array.size() / size

   partitionCount.times { partitionNumber ->
       def start = partitionNumber * size
       def end = start + size - 1
       partitions << array[start..end]
   }

   if (array.size() % size) partitions << array[partitionCount * size..-1]
   return partitions
}

The manager can be called like this:

  def parallFunc = { array->
        log.info "I'm multiplying $array by 2"
        return array.collect{it*2}
    }

  def results = ParallelManager.runParallelTask( [1,2,3,4,5,6,7,8], parallFunc, 3)

This code occasionally throws this Exception at the line marked above:

 Exception in thread "Thread-3" java.lang.ArrayIndexOutOfBoundsException: 1
 [java]     at java.util.ArrayList.add(ArrayList.java:352)
 [java]     at java_util_List$add.call(Unknown Source)

Do you have a fix for this issue? I think a little thread manager like this one will be useful to many people to speed up common tasks in their code.

Cheers, Mulone

Upvotes: 0

Views: 877

Answers (3)

Mulone
Mulone

Reputation: 3663

I solved the issue using Vectors instead of arraylists. Working code below:

class ParallelManager {

static def log = Logger.getLogger(ParallelManager)

Vector _threads = []
Vector _threadsFinishedCorrectly = []
Vector _results = []

/**
 * 
 * @param dataArray
 * @param chunkSize
 * @param taskFunc
 * @return
 */
def runParallelTasks( def dataArray, int chunkSize, def taskFunc ){
    reset()
    assert chunkSize > 0
    assert taskFunc
    if (dataArray.size()==0) return

    assert dataArray.size() >= 0

    def subArray = partitionArray(dataArray, chunkSize)
    assert subArray.size() > 0

    subArray.each{ arrChunk->
        _threads.add( Thread.start{

            def chunkResults = taskFunc(arrChunk)
            assert chunkResults != null 
            _results.add(chunkResults)
            _threadsFinishedCorrectly.add(true)
        })
    }

    // wait for all threads to finish
    _threads.each{ it.join() }
    log.debug("Waiting for all threads to finish...")

    assert _threadsFinishedCorrectly.size() == _threads.size(),'some threads failed.'
    assert _results.size() == _threads.size() 
    log.debug("${_threads.size()} finished.")
    def res = _results.flatten()    
    //reset()
    assert dataArray.size() == res.size(),"Something went wrong. Some threads did not return their results. results=$res" 
    return res
}

void reset(){
    _threads = []
    _results = []
    _threadsFinishedCorrectly = []
}

/**
*
* @param array
* @param size
* @return
*/
def partitionArray(array, size) {
   def partitions = []
   int partitionCount = array.size() / size

   partitionCount.times { partitionNumber ->
       def start = partitionNumber * size
       def end = start + size - 1
       partitions << array[start..end]
   }

   if (array.size() % size) partitions << array[partitionCount * size..-1]
   return partitions
  }
}

The manager can be called like this:

someClosure = {
    def resArray = doSomethingOn(it)
    return(resArray)
}
def resultArray = new ParallelManager().runParallelTasks( inputArray, 4, someClosure )

Upvotes: 1

tim_yates
tim_yates

Reputation: 171114

This is how you'd do it with GPars:

@Grab( 'org.codehaus.gpars:gpars:0.12' )
import groovyx.gpars.*

def arr = [ 1, 2, 3, 4, 5, 6, 7, 8 ]

arr = GParsPool.withPool {
  arr.collectParallel { it * 2 }
}

Upvotes: 4

stefanglase
stefanglase

Reputation: 10402

Did you hear about the gpars project? It is a proven library that aims to make concurrent programming for multi-core hardware intuitive. It is very powerful when it comes to collection processing.

I would like to advise you to rely on this library instead of implementing your own limited version of a simple thread manager.

Upvotes: 1

Related Questions