Reputation: 3663
I wrote a little thread manager in Groovy. The manager takes an array, a task to execute on an array, and a chunk size. The task is then executed in a separate thread, and the results added to a result array.
This is the class code:
class ParallelManager {
static def _threads = []
static def _threadsFinishedCorrectly = []
static def _results = []
static def runParallelTask( def dataArray, def taskFunc, int chunkSize ){
assert chunkSize > 0
assert taskFunc
if (dataArray.size()==0) return
assert dataArray.size() >= 0
def subArray = partitionArray(dataArray, chunkSize)
assert subArray.size() > 0
subArray.each{ arrChunk->
_threads.add( Thread.start{
def chunkResults = taskFunc(arrChunk)
assert chunkResults != null
_results.add(chunkResults) // EXCEPTION HERE
_threadsFinishedCorrectly.add(true)
})
}
// wait for all threads to finish
_threads.each{ it.join() }
log.info("Waiting for all threads to finish...")
assert _threadsFinishedCorrectly.size() == _threads.size(),'some threads failed.'
assert _results.size() == _threads.size()
log.info("${_threads.size()} finished.")
return _results
}
/**
* Util function
* @param array
* @param size
* @return
*/
static def partitionArray(array, size) {
def partitions = []
int partitionCount = array.size() / size
partitionCount.times { partitionNumber ->
def start = partitionNumber * size
def end = start + size - 1
partitions << array[start..end]
}
if (array.size() % size) partitions << array[partitionCount * size..-1]
return partitions
}
The manager can be called like this:
def parallFunc = { array->
log.info "I'm multiplying $array by 2"
return array.collect{it*2}
}
def results = ParallelManager.runParallelTask( [1,2,3,4,5,6,7,8], parallFunc, 3)
This code occasionally throws this Exception at the line marked above:
Exception in thread "Thread-3" java.lang.ArrayIndexOutOfBoundsException: 1
[java] at java.util.ArrayList.add(ArrayList.java:352)
[java] at java_util_List$add.call(Unknown Source)
Do you have a fix for this issue? I think a little thread manager like this one will be useful to many people to speed up common tasks in their code.
Cheers, Mulone
Upvotes: 0
Views: 877
Reputation: 3663
I solved the issue using Vectors instead of arraylists. Working code below:
class ParallelManager {
static def log = Logger.getLogger(ParallelManager)
Vector _threads = []
Vector _threadsFinishedCorrectly = []
Vector _results = []
/**
*
* @param dataArray
* @param chunkSize
* @param taskFunc
* @return
*/
def runParallelTasks( def dataArray, int chunkSize, def taskFunc ){
reset()
assert chunkSize > 0
assert taskFunc
if (dataArray.size()==0) return
assert dataArray.size() >= 0
def subArray = partitionArray(dataArray, chunkSize)
assert subArray.size() > 0
subArray.each{ arrChunk->
_threads.add( Thread.start{
def chunkResults = taskFunc(arrChunk)
assert chunkResults != null
_results.add(chunkResults)
_threadsFinishedCorrectly.add(true)
})
}
// wait for all threads to finish
_threads.each{ it.join() }
log.debug("Waiting for all threads to finish...")
assert _threadsFinishedCorrectly.size() == _threads.size(),'some threads failed.'
assert _results.size() == _threads.size()
log.debug("${_threads.size()} finished.")
def res = _results.flatten()
//reset()
assert dataArray.size() == res.size(),"Something went wrong. Some threads did not return their results. results=$res"
return res
}
void reset(){
_threads = []
_results = []
_threadsFinishedCorrectly = []
}
/**
*
* @param array
* @param size
* @return
*/
def partitionArray(array, size) {
def partitions = []
int partitionCount = array.size() / size
partitionCount.times { partitionNumber ->
def start = partitionNumber * size
def end = start + size - 1
partitions << array[start..end]
}
if (array.size() % size) partitions << array[partitionCount * size..-1]
return partitions
}
}
The manager can be called like this:
someClosure = {
def resArray = doSomethingOn(it)
return(resArray)
}
def resultArray = new ParallelManager().runParallelTasks( inputArray, 4, someClosure )
Upvotes: 1
Reputation: 171114
This is how you'd do it with GPars:
@Grab( 'org.codehaus.gpars:gpars:0.12' )
import groovyx.gpars.*
def arr = [ 1, 2, 3, 4, 5, 6, 7, 8 ]
arr = GParsPool.withPool {
arr.collectParallel { it * 2 }
}
Upvotes: 4
Reputation: 10402
Did you hear about the gpars project? It is a proven library that aims to make concurrent programming for multi-core hardware intuitive. It is very powerful when it comes to collection processing.
I would like to advise you to rely on this library instead of implementing your own limited version of a simple thread manager.
Upvotes: 1