Reputation: 295
I have this array of HashMap defined as below
var distinctElementsDefinitionMap: scala.collection.mutable.ArrayBuffer[HashMap[String, Int]] = new scala.collection.mutable.ArrayBuffer[HashMap[String, Int]](300) with scala.collection.mutable.SynchronizedBuffer[HashMap[String, Int]]
Now, I have a parallel collection of 300 elements
val max_length = 300
val columnArray = (0 until max_length).toParArray
import scala.collection.parallel.ForkJoinTaskSupport
columnArray.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(100))
columnArray foreach(i => {
// Do Some Computation and get a HashMap
var distinctElementsMap: HashMap[String, Int] = //Some Value
//This line might result in Concurrent Access Exception
distinctElementsDefinitionMap.update(i, distinctElementsMap)
})
I am now running a computation intensive task within a foreach
loop on the columnArray
defined above.
After the computation is complete, I would like each of the threads to update a particular entry of the distinctElementsDefinitionMap
array.
Each thread would update only particular index value, unique to the thread executing it.
I want to know if this updation of an entry of the array is safe with multiple threads possibly writing to it at the same time?
If not is there a synchronized
way of doing it so it's thread-safe?
Thank You!
Update:
It appears this is really not the safe way to do it. I am getting a java.util.ConcurrentModificationException
Any tips on how to avoid this whilst using the parallel collections.
Upvotes: 0
Views: 1189
Reputation: 62835
Use .groupBy
operation, as far as I can judge it is parallelized (unlike some other methods, such as .sorted
)
case class Row(a: String, b: String, c: String)
val data = Vector(
Row("foo", "", ""),
Row("bar", "", ""),
Row("foo", "", "")
)
data.par.groupBy(x => x.a).seq
// Map(bar -> ParVector(Row(bar,,)), foo -> ParVector(Row(foo,,), Row(foo,,)))
Hope you got the idea.
Alternatively, if your RAM allows you, parallelize processing over each column, not row, it has to be waaaay more efficient than your current approach (less contention).
val columnsCount = 3 // 300 in your case
Vector.range(0, columnsCount).par.map { column =>
data.groupBy(row => row(column))
}.seq
Though you likely will have memory problems even with the single column (8M rows might be quite a lot).
Upvotes: 0